Client
The bot client.
Note
By default, all non-privileged intents will be enabled
Attributes:
Name | Type | Description |
---|---|---|
intents |
Union[int, Intents]: The intents to use |
|
default_prefix |
None |
Union[str, Iterable[str]]: The default prefix (or prefixes) to use for prefixed commands. Defaults to your bot being mentioned. |
generate_prefixes |
None |
Callable[..., Coroutine]: A coroutine that returns a string or an iterable of strings to determine prefixes. |
status |
Status: The status the bot should log in with (IE ONLINE, DND, IDLE) |
|
activity |
Union[Activity, str]: The activity the bot should log in "playing" |
|
sync_interactions |
None |
bool: Should application commands be synced with discord? |
delete_unused_application_cmds |
bool: Delete any commands from discord that aren't implemented in this client |
|
enforce_interaction_perms |
bool: Enforce discord application command permissions, locally |
|
fetch_members |
None |
bool: Should the client fetch members from guilds upon startup (this will delay the client being ready) |
auto_defer |
None |
AutoDefer: A system to automatically defer commands after a set duration |
interaction_context |
Type[InteractionContext] |
Type[InteractionContext]: InteractionContext: The object to instantiate for Interaction Context |
prefixed_context |
Type[PrefixedContext] |
Type[PrefixedContext]: The object to instantiate for Prefixed Context |
component_context |
Type[ComponentContext] |
Type[ComponentContext]: The object to instantiate for Component Context |
autocomplete_context |
Type[AutocompleteContext] |
Type[AutocompleteContext]: The object to instantiate for Autocomplete Context |
modal_context |
Type[ModalContext] |
Type[ModalContext]: The object to instantiate for Modal Context |
hybrid_context |
Type[HybridContext] |
Type[HybridContext]: The object to instantiate for Hybrid Context |
global_pre_run_callback |
Callable[..., Coroutine]: A coroutine to run before every command is executed |
|
global_post_run_callback |
Callable[..., Coroutine]: A coroutine to run after every command is executed |
|
send_command_tracebacks |
bool |
bool: Should the traceback of command errors be sent in reply to the command invocation |
total_shards |
int: The total number of shards in use |
|
shard_id |
int: The zero based int ID of this shard |
|
debug_scope |
None |
Snowflake_Type: Force all application commands to be registered within this scope |
asyncio_debug |
bool: Enable asyncio debug features |
|
basic_logging |
bool: Utilise basic logging to output library data to console. Do not use in combination with |
|
logging_level |
int: The level of logging to use for basic_logging. Do not use in combination with |
|
logger |
None |
logging.Logger: The logger NAFF should use. Do not use in combination with |
Optionally, you can configure the caches here, by specifying the name of the cache, followed by a dict-style object to use.
It is recommended to use smart_cache.create_cache
to configure the cache here.
as an example, this is a recommended attribute message_cache=create_cache(250, 50)
,
Note
Setting a message cache hard limit to None is not recommended, as it could result in extremely high memory usage, we suggest a sane limit.
Source code in naff/client/client.py
class Client(
processors.AutoModEvents,
processors.ChannelEvents,
processors.GuildEvents,
processors.MemberEvents,
processors.MessageEvents,
processors.ReactionEvents,
processors.RoleEvents,
processors.StageEvents,
processors.ThreadEvents,
processors.UserEvents,
processors.VoiceEvents,
):
"""
The bot client.
note:
By default, all non-privileged intents will be enabled
Attributes:
intents: Union[int, Intents]: The intents to use
default_prefix: Union[str, Iterable[str]]: The default prefix (or prefixes) to use for prefixed commands. Defaults to your bot being mentioned.
generate_prefixes: Callable[..., Coroutine]: A coroutine that returns a string or an iterable of strings to determine prefixes.
status: Status: The status the bot should log in with (IE ONLINE, DND, IDLE)
activity: Union[Activity, str]: The activity the bot should log in "playing"
sync_interactions: bool: Should application commands be synced with discord?
delete_unused_application_cmds: bool: Delete any commands from discord that aren't implemented in this client
enforce_interaction_perms: bool: Enforce discord application command permissions, locally
fetch_members: bool: Should the client fetch members from guilds upon startup (this will delay the client being ready)
auto_defer: AutoDefer: A system to automatically defer commands after a set duration
interaction_context: Type[InteractionContext]: InteractionContext: The object to instantiate for Interaction Context
prefixed_context: Type[PrefixedContext]: The object to instantiate for Prefixed Context
component_context: Type[ComponentContext]: The object to instantiate for Component Context
autocomplete_context: Type[AutocompleteContext]: The object to instantiate for Autocomplete Context
modal_context: Type[ModalContext]: The object to instantiate for Modal Context
hybrid_context: Type[HybridContext]: The object to instantiate for Hybrid Context
global_pre_run_callback: Callable[..., Coroutine]: A coroutine to run before every command is executed
global_post_run_callback: Callable[..., Coroutine]: A coroutine to run after every command is executed
send_command_tracebacks: bool: Should the traceback of command errors be sent in reply to the command invocation
total_shards: int: The total number of shards in use
shard_id: int: The zero based int ID of this shard
debug_scope: Snowflake_Type: Force all application commands to be registered within this scope
asyncio_debug: bool: Enable asyncio debug features
basic_logging: bool: Utilise basic logging to output library data to console. Do not use in combination with `Client.logger`
logging_level: int: The level of logging to use for basic_logging. Do not use in combination with `Client.logger`
logger: logging.Logger: The logger NAFF should use. Do not use in combination with `Client.basic_logging` and `Client.logging_level`. Note: Different loggers with multiple clients are not supported
Optionally, you can configure the caches here, by specifying the name of the cache, followed by a dict-style object to use.
It is recommended to use `smart_cache.create_cache` to configure the cache here.
as an example, this is a recommended attribute `message_cache=create_cache(250, 50)`,
!!! note
Setting a message cache hard limit to None is not recommended, as it could result in extremely high memory usage, we suggest a sane limit.
"""
def __init__(
self,
*,
activity: Union[Activity, str] = None,
auto_defer: Absent[Union[AutoDefer, bool]] = MISSING,
autocomplete_context: Type[AutocompleteContext] = AutocompleteContext,
component_context: Type[ComponentContext] = ComponentContext,
debug_scope: Absent["Snowflake_Type"] = MISSING,
default_prefix: str | Iterable[str] = MENTION_PREFIX,
delete_unused_application_cmds: bool = False,
enforce_interaction_perms: bool = True,
fetch_members: bool = False,
generate_prefixes: Absent[Callable[..., Coroutine]] = MISSING,
global_post_run_callback: Absent[Callable[..., Coroutine]] = MISSING,
global_pre_run_callback: Absent[Callable[..., Coroutine]] = MISSING,
intents: Union[int, Intents] = Intents.DEFAULT,
interaction_context: Type[InteractionContext] = InteractionContext,
logger: logging.Logger = logger,
owner_ids: Iterable["Snowflake_Type"] = (),
modal_context: Type[ModalContext] = ModalContext,
prefixed_context: Type[PrefixedContext] = PrefixedContext,
hybrid_context: Type[HybridContext] = HybridContext,
send_command_tracebacks: bool = True,
shard_id: int = 0,
status: Status = Status.ONLINE,
sync_interactions: bool = True,
sync_ext: bool = True,
total_shards: int = 1,
basic_logging: bool = False,
logging_level: int = logging.INFO,
**kwargs,
) -> None:
if basic_logging:
logging.basicConfig()
logger.setLevel(logging_level)
# Set Up logger and overwrite the constant
self.logger = logger
"""The logger NAFF should use. Do not use in combination with `Client.basic_logging` and `Client.logging_level`. Note: Different loggers with multiple clients are not supported"""
constants.logger = logger
# Configuration
self.sync_interactions = sync_interactions
"""Should application commands be synced"""
self.del_unused_app_cmd: bool = delete_unused_application_cmds
"""Should unused application commands be deleted?"""
self.sync_ext: bool = sync_ext
"""Should we sync whenever a extension is (un)loaded"""
self.debug_scope = to_snowflake(debug_scope) if debug_scope is not MISSING else MISSING
"""Sync global commands as guild for quicker command updates during debug"""
self.default_prefix = default_prefix
"""The default prefix to be used for prefixed commands"""
self.generate_prefixes = generate_prefixes if generate_prefixes is not MISSING else self.generate_prefixes
"""A coroutine that returns a prefix or an iterable of prefixes, for dynamic prefixes"""
self.send_command_tracebacks: bool = send_command_tracebacks
"""Should the traceback of command errors be sent in reply to the command invocation"""
if auto_defer is True:
auto_defer = AutoDefer(enabled=True)
else:
auto_defer = auto_defer or AutoDefer()
self.auto_defer = auto_defer
"""A system to automatically defer commands after a set duration"""
self.intents = intents if isinstance(intents, Intents) else Intents(intents)
# resources
self.http: HTTPClient = HTTPClient()
"""The HTTP client to use when interacting with discord endpoints"""
# context objects
self.interaction_context: Type[InteractionContext] = interaction_context
"""The object to instantiate for Interaction Context"""
self.prefixed_context: Type[PrefixedContext] = prefixed_context
"""The object to instantiate for Prefixed Context"""
self.component_context: Type[ComponentContext] = component_context
"""The object to instantiate for Component Context"""
self.autocomplete_context: Type[AutocompleteContext] = autocomplete_context
"""The object to instantiate for Autocomplete Context"""
self.modal_context: Type[ModalContext] = modal_context
"""The object to instantiate for Modal Context"""
self.hybrid_context: Type[HybridContext] = hybrid_context
"""The object to instantiate for Hybrid Context"""
# flags
self._ready = asyncio.Event()
self._closed = False
self._startup = False
self._guild_event = asyncio.Event()
self.guild_event_timeout = 3
"""How long to wait for guilds to be cached"""
# Sharding
self.total_shards = total_shards
self._connection_state: ConnectionState = ConnectionState(self, intents, shard_id)
self.enforce_interaction_perms = enforce_interaction_perms
self.fetch_members = fetch_members
"""Fetch the full members list of all guilds on startup"""
self._mention_reg = MISSING
# caches
self.cache: GlobalCache = GlobalCache(self, **{k: v for k, v in kwargs.items() if hasattr(GlobalCache, k)})
# these store the last sent presence data for change_presence
self._status: Status = status
if isinstance(activity, str):
self._activity = Activity.create(name=str(activity))
else:
self._activity: Activity = activity
self._user: Absent[NaffUser] = MISSING
self._app: Absent[Application] = MISSING
# collections
self.prefixed_commands: Dict[str, PrefixedCommand] = {}
"""A dictionary of registered prefixed commands: `{name: command}`"""
self.interactions: Dict["Snowflake_Type", Dict[str, InteractionCommand]] = {}
"""A dictionary of registered application commands: `{cmd_id: command}`"""
self._component_callbacks: Dict[str, Callable[..., Coroutine]] = {}
self._modal_callbacks: Dict[str, Callable[..., Coroutine]] = {}
self._interaction_scopes: Dict["Snowflake_Type", "Snowflake_Type"] = {}
self.processors: Dict[str, Callable[..., Coroutine]] = {}
self.__modules = {}
self.ext = {}
"""A dictionary of mounted ext"""
self.listeners: Dict[str, List] = {}
self.waits: Dict[str, List] = {}
self.owner_ids: set[Snowflake_Type] = set(owner_ids)
self.async_startup_tasks: list[Coroutine] = []
"""A list of coroutines to run during startup"""
# callbacks
if global_pre_run_callback:
if asyncio.iscoroutinefunction(global_pre_run_callback):
self.pre_run_callback: Callable[..., Coroutine] = global_pre_run_callback
else:
raise TypeError("Callback must be a coroutine")
else:
self.pre_run_callback = MISSING
if global_post_run_callback:
if asyncio.iscoroutinefunction(global_post_run_callback):
self.post_run_callback: Callable[..., Coroutine] = global_post_run_callback
else:
raise TypeError("Callback must be a coroutine")
else:
self.post_run_callback = MISSING
super().__init__()
self._sanity_check()
@property
def is_closed(self) -> bool:
"""Returns True if the bot has closed."""
return self._closed
@property
def is_ready(self) -> bool:
"""Returns True if the bot is ready."""
return self._ready.is_set()
@property
def latency(self) -> float:
"""Returns the latency of the websocket connection."""
return self._connection_state.latency
@property
def average_latency(self) -> float:
"""Returns the average latency of the websocket connection."""
return self._connection_state.average_latency
@property
def start_time(self) -> datetime:
"""The start time of the bot."""
return self._connection_state.start_time
@property
def gateway_started(self) -> bool:
"""Returns if the gateway has been started."""
return self._connection_state.gateway_started.is_set()
@property
def user(self) -> NaffUser:
"""Returns the bot's user."""
return self._user
@property
def app(self) -> Application:
"""Returns the bots application."""
return self._app
@property
def owner(self) -> Optional["User"]:
"""Returns the bot's owner'."""
try:
return self.app.owner
except TypeError:
return MISSING
@property
def owners(self) -> List["User"]:
"""Returns the bot's owners as declared via `client.owner_ids`."""
return [self.get_user(u_id) for u_id in self.owner_ids]
@property
def guilds(self) -> List["Guild"]:
"""Returns a list of all guilds the bot is in."""
return self.user.guilds
@property
def status(self) -> Status:
"""
Get the status of the bot.
IE online, afk, dnd
"""
return self._status
@property
def activity(self) -> Activity:
"""Get the activity of the bot."""
return self._activity
@property
def application_commands(self) -> List[InteractionCommand]:
"""A list of all application commands registered within the bot."""
commands = []
for scope in self.interactions.keys():
commands += [cmd for cmd in self.interactions[scope].values() if cmd not in commands]
return commands
@property
def ws(self) -> GatewayClient:
"""Returns the websocket client."""
return self._connection_state.gateway
def get_guild_websocket(self, id: "Snowflake_Type") -> GatewayClient:
return self.ws
def _sanity_check(self) -> None:
"""Checks for possible and common errors in the bot's configuration."""
logger.debug("Running client sanity checks...")
contexts = {
self.interaction_context: InteractionContext,
self.prefixed_context: PrefixedContext,
self.component_context: ComponentContext,
self.autocomplete_context: AutocompleteContext,
self.modal_context: ModalContext,
self.hybrid_context: HybridContext,
}
for obj, expected in contexts.items():
if not issubclass(obj, expected):
raise TypeError(f"{obj.__name__} must inherit from {expected.__name__}")
if self.del_unused_app_cmd:
logger.warning(
"As `delete_unused_application_cmds` is enabled, the client must cache all guilds app-commands, this could take a while."
)
if Intents.GUILDS not in self._connection_state.intents:
logger.warning("GUILD intent has not been enabled; this is very likely to cause errors")
if self.fetch_members and Intents.GUILD_MEMBERS not in self._connection_state.intents:
raise BotException("Members Intent must be enabled in order to use fetch members")
elif self.fetch_members:
logger.warning("fetch_members enabled; startup will be delayed")
if len(self.processors) == 0:
logger.warning("No Processors are loaded! This means no events will be processed!")
async def generate_prefixes(self, bot: "Client", message: Message) -> str | Iterable[str]:
"""
A method to get the bot's default_prefix, can be overridden to add dynamic prefixes.
!!! note
To easily override this method, simply use the `generate_prefixes` parameter when instantiating the client
Args:
bot: A reference to the client
message: A message to determine the prefix from.
Returns:
A string or an iterable of strings to use as a prefix. By default, this will return `client.default_prefix`
"""
return self.default_prefix
def _queue_task(self, coro: Listener, event: BaseEvent, *args, **kwargs) -> asyncio.Task:
async def _async_wrap(_coro: Listener, _event: BaseEvent, *_args, **_kwargs) -> None:
try:
if len(_event.__attrs_attrs__) == 2:
# override_name & bot
await _coro()
else:
await _coro(_event, *_args, **_kwargs)
except asyncio.CancelledError:
pass
except Exception as e:
if isinstance(event, events.Error):
# No infinite loops please
self.default_error_handler(repr(event), e)
else:
self.dispatch(events.Error(repr(event), e))
wrapped = _async_wrap(coro, event, *args, **kwargs)
return asyncio.create_task(wrapped, name=f"naff:: {event.resolved_name}")
@staticmethod
def default_error_handler(source: str, error: BaseException) -> None:
"""
The default error logging behaviour.
Args:
source: The source of this error
error: The exception itself
"""
out = traceback.format_exception(error)
if isinstance(error, HTTPException):
# HTTPException's are of 3 known formats, we can parse them for human readable errors
try:
errors = error.search_for_message(error.errors)
out = f"HTTPException: {error.status}|{error.response.reason}: " + "\n".join(errors)
except Exception: # noqa : S110
pass
logger.error(
"Ignoring exception in {}:{}{}".format(source, "\n" if len(out) > 1 else " ", "".join(out)),
)
@Listener.create()
async def _on_error(self, event: events.Error) -> None:
await self.on_error(event.source, event.error, *event.args, **event.kwargs)
async def on_error(self, source: str, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by the library.
By default it will format and print them to console
Override this to change error handling behaviour
"""
self.default_error_handler(source, error)
async def on_command_error(self, ctx: Context, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by commands.
By default it will call `Client.on_error`
Override this to change error handling behavior
"""
self.dispatch(events.Error(f"cmd /`{ctx.invoke_target}`", error, args, kwargs, ctx))
try:
if isinstance(error, errors.CommandOnCooldown):
await ctx.send(
embeds=Embed(
description=f"This command is on cooldown!\n"
f"Please try again in {int(error.cooldown.get_cooldown_time())} seconds",
color=BrandColors.FUCHSIA,
)
)
elif isinstance(error, errors.MaxConcurrencyReached):
await ctx.send(
embeds=Embed(
description="This command has reached its maximum concurrent usage!\n"
"Please try again shortly.",
color=BrandColors.FUCHSIA,
)
)
elif isinstance(error, errors.CommandCheckFailure):
await ctx.send(
embeds=Embed(
description="You do not have permission to run this command!",
color=BrandColors.YELLOW,
)
)
elif self.send_command_tracebacks:
out = "".join(traceback.format_exception(error))
if self.http.token is not None:
out = out.replace(self.http.token, "[REDACTED TOKEN]")
await ctx.send(
embeds=Embed(
title=f"Error: {type(error).__name__}",
color=BrandColors.RED,
description=f"```\n{out[:EMBED_MAX_DESC_LENGTH-8]}```",
)
)
except errors.NaffException:
pass
async def on_command(self, ctx: Context) -> None:
"""
Called *after* any command is ran.
By default, it will simply log the command, override this to change that behaviour
Args:
ctx: The context of the command that was called
"""
if isinstance(ctx, PrefixedContext):
symbol = "@"
elif isinstance(ctx, InteractionContext):
symbol = "/"
else:
symbol = "?" # likely custom context
logger.info(f"Command Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")
async def on_component_error(self, ctx: ComponentContext, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by components.
By default it will call `Naff.on_error`
Override this to change error handling behavior
"""
return self.dispatch(events.Error(f"Component Callback for {ctx.custom_id}", error, args, kwargs, ctx))
async def on_component(self, ctx: ComponentContext) -> None:
"""
Called *after* any component callback is ran.
By default, it will simply log the component use, override this to change that behaviour
Args:
ctx: The context of the component that was called
"""
symbol = "¢"
logger.info(f"Component Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")
async def on_autocomplete_error(self, ctx: AutocompleteContext, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by autocompletion options.
By default it will call `Naff.on_error`
Override this to change error handling behavior
"""
return self.dispatch(
events.Error(
f"Autocomplete Callback for /{ctx.invoke_target} - Option: {ctx.focussed_option}",
error,
args,
kwargs,
ctx,
)
)
async def on_autocomplete(self, ctx: AutocompleteContext) -> None:
"""
Called *after* any autocomplete callback is ran.
By default, it will simply log the autocomplete callback, override this to change that behaviour
Args:
ctx: The context of the command that was called
"""
symbol = "$"
logger.info(f"Autocomplete Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")
@Listener.create()
async def on_resume(self) -> None:
self._ready.set()
@Listener.create()
async def _on_websocket_ready(self, event: events.RawGatewayEvent) -> None:
"""
Catches websocket ready and determines when to dispatch the client `READY` signal.
Args:
event: The websocket ready packet
"""
data = event.data
expected_guilds = {to_snowflake(guild["id"]) for guild in data["guilds"]}
self._user._add_guilds(expected_guilds)
if not self._startup:
while True:
try: # wait to let guilds cache
await asyncio.wait_for(self._guild_event.wait(), self.guild_event_timeout)
except asyncio.TimeoutError:
logger.warning("Timeout waiting for guilds cache: Not all guilds will be in cache")
break
self._guild_event.clear()
if len(self.cache.guild_cache) == len(expected_guilds):
# all guilds cached
break
if self.fetch_members:
# ensure all guilds have completed chunking
for guild in self.guilds:
if guild and not guild.chunked.is_set():
logger.debug(f"Waiting for {guild.id} to chunk")
await guild.chunked.wait()
# run any pending startup tasks
if self.async_startup_tasks:
try:
await asyncio.gather(*self.async_startup_tasks)
except Exception as e:
self.dispatch(events.Error("async-extension-loader", e))
# cache slash commands
if not self._startup:
await self._init_interactions()
self._startup = True
self.dispatch(events.Startup())
else:
# reconnect ready
ready_guilds = set()
async def _temp_listener(_event: events.RawGatewayEvent) -> None:
ready_guilds.add(_event.data["id"])
listener = Listener.create("_on_raw_guild_create")(_temp_listener)
self.add_listener(listener)
while True:
try:
await asyncio.wait_for(self._guild_event.wait(), self.guild_event_timeout)
if len(ready_guilds) == len(expected_guilds):
break
except asyncio.TimeoutError:
break
self.listeners["raw_guild_create"].remove(listener)
self._ready.set()
self.dispatch(events.Ready())
async def login(self, token) -> None:
"""
Login to discord via http.
!!! note
You will need to run Naff.start_gateway() before you start receiving gateway events.
Args:
token str: Your bot's token
"""
# i needed somewhere to put this call,
# login will always run after initialisation
# so im gathering commands here
self._gather_commands()
logger.debug("Attempting to login")
me = await self.http.login(token.strip())
self._user = NaffUser.from_dict(me, self)
self.cache.place_user_data(me)
self._app = Application.from_dict(await self.http.get_current_bot_information(), self)
self._mention_reg = re.compile(rf"^(<@!?{self.user.id}*>\s)")
if self.app.owner:
self.owner_ids.add(self.app.owner.id)
self.dispatch(events.Login())
async def astart(self, token) -> None:
"""
Asynchronous method to start the bot.
Args:
token: Your bot's token
Returns:
"""
await self.login(token)
try:
await self._connection_state.start()
finally:
await self.stop()
def start(self, token) -> None:
"""
Start the bot.
info:
This is the recommended method to start the bot
Args:
token: Your bot's token
"""
try:
asyncio.run(self.astart(token))
except KeyboardInterrupt:
# ignore, cus this is useless and can be misleading to the
# user
pass
async def start_gateway(self) -> None:
"""Starts the gateway connection."""
try:
await self._connection_state.start()
finally:
await self.stop()
async def stop(self) -> None:
"""Shutdown the bot."""
logger.debug("Stopping the bot.")
self._ready.clear()
await self.http.close()
await self._connection_state.stop()
def dispatch(self, event: events.BaseEvent, *args, **kwargs) -> None:
"""
Dispatch an event.
Args:
event: The event to be dispatched.
"""
listeners = self.listeners.get(event.resolved_name, [])
if listeners:
logger.debug(f"Dispatching Event: {event.resolved_name}")
event.bot = self
for _listen in listeners:
try:
self._queue_task(_listen, event, *args, **kwargs)
except Exception as e:
raise BotException(
f"An error occurred attempting during {event.resolved_name} event processing"
) from e
_waits = self.waits.get(event.resolved_name, [])
if _waits:
index_to_remove = []
for i, _wait in enumerate(_waits):
result = _wait(event)
if result:
index_to_remove.append(i)
for idx in sorted(index_to_remove, reverse=True):
_waits.pop(idx)
async def wait_until_ready(self) -> None:
"""Waits for the client to become ready."""
await self._ready.wait()
def wait_for(
self,
event: Union[str, "BaseEvent"],
checks: Absent[Optional[Callable[..., bool]]] = MISSING,
timeout: Optional[float] = None,
) -> Any:
"""
Waits for a WebSocket event to be dispatched.
Args:
event: The name of event to wait.
checks: A predicate to check what to wait for.
timeout: The number of seconds to wait before timing out.
Returns:
The event object.
"""
event = get_event_name(event)
if event not in self.waits:
self.waits[event] = []
future = asyncio.Future()
self.waits[event].append(Wait(event, checks, future))
return asyncio.wait_for(future, timeout)
async def wait_for_modal(
self,
modal: "Modal",
author: Optional["Snowflake_Type"] = None,
timeout: Optional[float] = None,
) -> ModalContext:
"""
Wait for a modal response.
Args:
modal: The modal we're waiting for.
author: The user we're waiting for to reply
timeout: A timeout in seconds to stop waiting
Returns:
The context of the modal response
Raises:
`asyncio.TimeoutError` if no response is received that satisfies the predicate before timeout seconds have passed
"""
author = to_snowflake(author) if author else None
def predicate(event) -> bool:
if modal.custom_id != event.context.custom_id:
return False
if author and author != to_snowflake(event.context.author):
return False
return True
resp = await self.wait_for("modal_response", predicate, timeout)
return resp.context
async def wait_for_component(
self,
messages: Union[Message, int, list] = None,
components: Optional[
Union[List[List[Union["BaseComponent", dict]]], List[Union["BaseComponent", dict]], "BaseComponent", dict]
] = None,
check: Optional[Callable] = None,
timeout: Optional[float] = None,
) -> "Component":
"""
Waits for a component to be sent to the bot.
Args:
messages: The message object to check for.
components: The components to wait for.
check: A predicate to check what to wait for.
timeout: The number of seconds to wait before timing out.
Returns:
`Component` that was invoked. Use `.context` to get the `ComponentContext`.
Raises:
`asyncio.TimeoutError` if timed out
"""
if not (messages or components):
raise ValueError("You must specify messages or components (or both)")
message_ids = (
to_snowflake_list(messages) if isinstance(messages, list) else to_snowflake(messages) if messages else None
)
custom_ids = list(get_components_ids(components)) if components else None
# automatically convert improper custom_ids
if custom_ids and not all(isinstance(x, str) for x in custom_ids):
custom_ids = [str(i) for i in custom_ids]
def _check(event: Component) -> bool:
ctx: ComponentContext = event.context
# if custom_ids is empty or there is a match
wanted_message = not message_ids or ctx.message.id in (
[message_ids] if isinstance(message_ids, int) else message_ids
)
wanted_component = not custom_ids or ctx.custom_id in custom_ids
if wanted_message and wanted_component:
if check is None or check(event):
return True
return False
return False
return await self.wait_for("component", checks=_check, timeout=timeout)
def listen(self, event_name: Absent[str] = MISSING) -> Listener:
"""
A decorator to be used in situations that Naff can't automatically hook your listeners. Ideally, the standard listen decorator should be used, not this.
Args:
event_name: The event name to use, if not the coroutine name
Returns:
A listener that can be used to hook into the event.
"""
def wrapper(coro: Callable[..., Coroutine]) -> Listener:
listener = Listener.create(event_name)(coro)
self.add_listener(listener)
return listener
return wrapper
def add_event_processor(self, event_name: Absent[str] = MISSING) -> Callable[..., Coroutine]:
"""
A decorator to be used to add event processors.
Args:
event_name: The event name to use, if not the coroutine name
Returns:
A function that can be used to hook into the event.
"""
def wrapper(coro: Callable[..., Coroutine]) -> Callable[..., Coroutine]:
name = event_name
if name is MISSING:
name = coro.__name__
name = name.lstrip("_")
name = name.removeprefix("on_")
self.processors[name] = coro
return coro
return wrapper
def add_listener(self, listener: Listener) -> None:
"""
Add a listener for an event, if no event is passed, one is determined.
Args:
listener Listener: The listener to add to the client
"""
# check that the required intents are enabled
event_class_name = "".join([name.capitalize() for name in listener.event.split("_")])
if event_class := globals().get(event_class_name):
if required_intents := _INTENT_EVENTS.get(event_class): # noqa
if not any(required_intent in self.intents for required_intent in required_intents):
self.logger.warning(
f"Event `{listener.event}` will not work since the required intent is not set -> Requires any of: `{required_intents}`"
)
if listener.event not in self.listeners:
self.listeners[listener.event] = []
self.listeners[listener.event].append(listener)
def add_interaction(self, command: InteractionCommand) -> bool:
"""
Add a slash command to the client.
Args:
command InteractionCommand: The command to add
"""
if self.debug_scope:
command.scopes = [self.debug_scope]
# for SlashCommand objs without callback (like objects made to hold group info etc)
if command.callback is None:
return False
for scope in command.scopes:
if scope not in self.interactions:
self.interactions[scope] = {}
elif command.resolved_name in self.interactions[scope]:
old_cmd = self.interactions[scope][command.resolved_name]
raise ValueError(f"Duplicate Command! {scope}::{old_cmd.resolved_name}")
if self.enforce_interaction_perms:
command.checks.append(command._permission_enforcer) # noqa : w0212
self.interactions[scope][command.resolved_name] = command
return True
def add_hybrid_command(self, command: HybridCommand) -> bool:
if self.debug_scope:
command.scopes = [self.debug_scope]
if command.callback is None:
return False
if command.is_subcommand:
prefixed_base = self.prefixed_commands.get(str(command.name))
if not prefixed_base:
prefixed_base = _base_subcommand_generator(
str(command.name), list(command.name.to_locale_dict().values()), str(command.description)
)
self.add_prefixed_command(prefixed_base)
if command.group_name: # if this is a group command
_prefixed_cmd = prefixed_base
prefixed_base = prefixed_base.subcommands.get(str(command.group_name))
if not prefixed_base:
prefixed_base = _base_subcommand_generator(
str(command.group_name),
list(command.group_name.to_locale_dict().values()),
str(command.group_description),
group=True,
)
_prefixed_cmd.add_command(prefixed_base)
new_command = _prefixed_from_slash(command)
new_command._parse_parameters()
prefixed_base.add_command(new_command)
else:
new_command = _prefixed_from_slash(command)
self.add_prefixed_command(new_command)
return self.add_interaction(command)
def add_prefixed_command(self, command: PrefixedCommand) -> None:
"""
Add a prefixed command to the client.
Args:
command PrefixedCommand: The command to add
"""
# check that the required intent is enabled or the prefix is a mention
prefixes = (
self.default_prefix
if not isinstance(self.default_prefix, str) and not self.default_prefix == MENTION_PREFIX
else (self.default_prefix,)
)
if (MENTION_PREFIX not in prefixes) and (Intents.GUILD_MESSAGE_CONTENT not in self.intents):
self.logger.warning(
f"Prefixed commands will not work since the required intent is not set -> Requires: `{Intents.GUILD_MESSAGE_CONTENT.__repr__()}` or usage of the default `MENTION_PREFIX` as the prefix"
)
command._parse_parameters()
if self.prefixed_commands.get(command.name):
raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {command.name}.")
self.prefixed_commands[command.name] = command
for alias in command.aliases:
if self.prefixed_commands.get(alias):
raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {alias}.")
self.prefixed_commands[alias] = command
def add_component_callback(self, command: ComponentCommand) -> None:
"""
Add a component callback to the client.
Args:
command: The command to add
"""
for listener in command.listeners:
# I know this isn't an ideal solution, but it means we can lookup callbacks with O(1)
if listener not in self._component_callbacks.keys():
self._component_callbacks[listener] = command
continue
else:
raise ValueError(f"Duplicate Component! Multiple component callbacks for `{listener}`")
def add_modal_callback(self, command: ModalCommand) -> None:
"""
Add a modal callback to the client.
Args:
command: The command to add
"""
for listener in command.listeners:
if listener not in self._modal_callbacks.keys():
self._modal_callbacks[listener] = command
continue
else:
raise ValueError(f"Duplicate Component! Multiple modal callbacks for `{listener}`")
def _gather_commands(self) -> None:
"""Gathers commands from __main__ and self."""
def process(_cmds) -> None:
for func in _cmds:
if isinstance(func, ModalCommand):
self.add_modal_callback(func)
elif isinstance(func, ComponentCommand):
self.add_component_callback(func)
elif isinstance(func, HybridCommand):
self.add_hybrid_command(func)
elif isinstance(func, InteractionCommand):
self.add_interaction(func)
elif (
isinstance(func, PrefixedCommand) and not func.is_subcommand
): # subcommands will be added with main comamnds
self.add_prefixed_command(func)
elif isinstance(func, Listener):
self.add_listener(func)
logger.debug(f"{len(_cmds)} commands have been loaded from `__main__` and `client`")
process(
[obj for _, obj in inspect.getmembers(sys.modules["__main__"]) if isinstance(obj, (BaseCommand, Listener))]
)
process(
[
obj.copy_with_binding(self)
for _, obj in inspect.getmembers(self)
if isinstance(obj, (BaseCommand, Listener))
]
)
[wrap_partial(obj, self) for _, obj in inspect.getmembers(self) if isinstance(obj, Task)]
async def _init_interactions(self) -> None:
"""
Initialise slash commands.
If `sync_interactions` this will submit all registered slash
commands to discord. Otherwise, it will get the list of
interactions and cache their scopes.
"""
# allow for ext and main to share the same decorator
try:
if self.sync_interactions:
await self.synchronise_interactions()
else:
await self._cache_interactions(warn_missing=False)
except Exception as e:
self.dispatch(events.Error("Interaction Syncing", e))
async def _cache_interactions(self, warn_missing: bool = False) -> None:
"""Get all interactions used by this bot and cache them."""
if warn_missing or self.del_unused_app_cmd:
bot_scopes = {g.id for g in self.cache.guild_cache.values()}
bot_scopes.add(GLOBAL_SCOPE)
else:
bot_scopes = set(self.interactions)
req_lock = asyncio.Lock()
async def wrap(*args, **kwargs) -> Absent[List[Dict]]:
async with req_lock:
# throttle this
await asyncio.sleep(0.1)
try:
return await self.http.get_application_commands(*args, **kwargs)
except Forbidden:
return MISSING
results = await asyncio.gather(*[wrap(self.app.id, scope) for scope in bot_scopes])
results = dict(zip(bot_scopes, results))
for scope, remote_cmds in results.items():
if remote_cmds == MISSING:
logger.debug(f"Bot was not invited to guild {scope} with `application.commands` scope")
continue
remote_cmds = {cmd_data["name"]: cmd_data for cmd_data in remote_cmds}
found = set() # this is a temporary hack to fix subcommand detection
if scope in self.interactions:
for cmd in self.interactions[scope].values():
cmd_name = str(cmd.name)
cmd_data = remote_cmds.get(cmd_name, MISSING)
if cmd_data is MISSING:
if cmd_name not in found:
if warn_missing:
logger.error(
f'Detected yet to sync slash command "/{cmd_name}" for scope '
f"{'global' if scope == GLOBAL_SCOPE else scope}"
)
continue
else:
found.add(cmd_name)
self._interaction_scopes[str(cmd_data["id"])] = scope
cmd.cmd_id[scope] = int(cmd_data["id"])
if warn_missing:
for cmd_data in remote_cmds.values():
logger.error(
f"Detected unimplemented slash command \"/{cmd_data['name']}\" for scope "
f"{'global' if scope == GLOBAL_SCOPE else scope}"
)
async def synchronise_interactions(
self, *, scopes: list["Snowflake_Type"] = MISSING, delete_commands: Absent[bool] = MISSING
) -> None:
"""
Synchronise registered interactions with discord.
Args:
scopes: Optionally specify which scopes are to be synced
delete_commands: Override the client setting and delete commands
"""
s = time.perf_counter()
_delete_cmds = self.del_unused_app_cmd if delete_commands is MISSING else delete_commands
await self._cache_interactions()
if scopes is not MISSING:
cmd_scopes = scopes
elif self.del_unused_app_cmd:
# if we're deleting unused commands, we check all scopes
cmd_scopes = [to_snowflake(g_id) for g_id in self._user._guild_ids] + [GLOBAL_SCOPE]
else:
# if we're not deleting, just check the scopes we have cmds registered in
cmd_scopes = list(set(self.interactions) | {GLOBAL_SCOPE})
local_cmds_json = application_commands_to_dict(self.interactions)
async def sync_scope(cmd_scope) -> None:
sync_needed_flag = False # a flag to force this scope to synchronise
sync_payload = [] # the payload to be pushed to discord
try:
try:
remote_commands = await self.http.get_application_commands(self.app.id, cmd_scope)
except Forbidden:
logger.warning(f"Bot is lacking `application.commands` scope in {cmd_scope}!")
return
for local_cmd in self.interactions.get(cmd_scope, {}).values():
# get remote equivalent of this command
remote_cmd_json = next(
(v for v in remote_commands if int(v["id"]) == local_cmd.cmd_id.get(cmd_scope)), None
)
# get json representation of this command
local_cmd_json = next((c for c in local_cmds_json[cmd_scope] if c["name"] == str(local_cmd.name)))
# this works by adding any command we *want* on Discord, to a payload, and synchronising that
# this allows us to delete unused commands, add new commands, or do nothing in 1 or less API calls
if sync_needed(local_cmd_json, remote_cmd_json):
# determine if the local and remote commands are out-of-sync
sync_needed_flag = True
sync_payload.append(local_cmd_json)
elif not _delete_cmds and remote_cmd_json:
_remote_payload = {
k: v for k, v in remote_cmd_json.items() if k not in ("id", "application_id", "version")
}
sync_payload.append(_remote_payload)
elif _delete_cmds:
sync_payload.append(local_cmd_json)
sync_payload = [json.loads(_dump) for _dump in {json.dumps(_cmd) for _cmd in sync_payload}]
if sync_needed_flag or (_delete_cmds and len(sync_payload) < len(remote_commands)):
# synchronise commands if flag is set, or commands are to be deleted
logger.info(f"Overwriting {cmd_scope} with {len(sync_payload)} application commands")
sync_response: list[dict] = await self.http.overwrite_application_commands(
self.app.id, sync_payload, cmd_scope
)
self._cache_sync_response(sync_response, cmd_scope)
else:
logger.debug(f"{cmd_scope} is already up-to-date with {len(remote_commands)} commands.")
except Forbidden as e:
raise InteractionMissingAccess(cmd_scope) from e
except HTTPException as e:
self._raise_sync_exception(e, local_cmds_json, cmd_scope)
await asyncio.gather(*[sync_scope(scope) for scope in cmd_scopes])
t = time.perf_counter() - s
logger.debug(f"Sync of {len(cmd_scopes)} scopes took {t} seconds")
def get_application_cmd_by_id(self, cmd_id: "Snowflake_Type") -> Optional[InteractionCommand]:
"""
Get a application command from the internal cache by its ID.
Args:
cmd_id: The ID of the command
Returns:
The command, if one with the given ID exists internally, otherwise None
"""
scope = self._interaction_scopes.get(str(cmd_id), MISSING)
cmd_id = int(cmd_id) # ensure int ID
if scope != MISSING:
for cmd in self.interactions[scope].values():
if int(cmd.cmd_id.get(scope)) == cmd_id:
return cmd
return None
@staticmethod
def _raise_sync_exception(e: HTTPException, cmds_json: dict, cmd_scope: "Snowflake_Type") -> NoReturn:
try:
if isinstance(e.errors, dict):
for cmd_num in e.errors.keys():
cmd = cmds_json[cmd_scope][int(cmd_num)]
output = e.search_for_message(e.errors[cmd_num], cmd)
if len(output) > 1:
output = "\n".join(output)
logger.error(f"Multiple Errors found in command `{cmd['name']}`:\n{output}")
else:
logger.error(f"Error in command `{cmd['name']}`: {output[0]}")
else:
raise e from None
except Exception:
# the above shouldn't fail, but if it does, just raise the exception normally
raise e from None
def _cache_sync_response(self, sync_response: list[dict], scope: "Snowflake_Type") -> None:
for cmd_data in sync_response:
self._interaction_scopes[cmd_data["id"]] = scope
if cmd_data["name"] in self.interactions[scope]:
self.interactions[scope][cmd_data["name"]].cmd_id[scope] = int(cmd_data["id"])
else:
# sub_cmd
for sc in cmd_data["options"]:
if sc["type"] == OptionTypes.SUB_COMMAND:
if f"{cmd_data['name']} {sc['name']}" in self.interactions[scope]:
self.interactions[scope][f"{cmd_data['name']} {sc['name']}"].cmd_id[scope] = int(
cmd_data["id"]
)
elif sc["type"] == OptionTypes.SUB_COMMAND_GROUP:
for _sc in sc["options"]:
if f"{cmd_data['name']} {sc['name']} {_sc['name']}" in self.interactions[scope]:
self.interactions[scope][f"{cmd_data['name']} {sc['name']} {_sc['name']}"].cmd_id[
scope
] = int(cmd_data["id"])
@overload
async def get_context(self, data: ComponentChannelInteractionData, interaction: Literal[True]) -> ComponentContext:
...
@overload
async def get_context(
self, data: AutocompleteChannelInteractionData, interaction: Literal[True]
) -> AutocompleteContext:
...
# as of right now, discord_typings doesn't include anything like this
# @overload
# async def get_context(self, data: ModalSubmitInteractionData, interaction: Literal[True]) -> ModalContext:
# ...
@overload
async def get_context(self, data: InteractionData, interaction: Literal[True]) -> InteractionContext:
...
@overload
async def get_context(
self, data: dict, interaction: Literal[True]
) -> ComponentContext | AutocompleteContext | ModalContext | InteractionContext:
# fallback case since some data isn't typehinted properly
...
@overload
async def get_context(self, data: Message, interaction: Literal[False] = False) -> PrefixedContext:
...
async def get_context(
self, data: InteractionData | dict | Message, interaction: bool = False
) -> ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext:
"""
Return a context object based on data passed.
note:
If you want to use custom context objects, this is the method to override. Your replacement must take the same arguments as this, and return a Context-like object.
Args:
data: The data of the event
interaction: Is this an interaction or not?
Returns:
Context object
"""
# this line shuts up IDE warnings
cls: ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext
if interaction:
match data["type"]:
case InteractionTypes.MESSAGE_COMPONENT:
cls = self.component_context.from_dict(data, self)
case InteractionTypes.AUTOCOMPLETE:
cls = self.autocomplete_context.from_dict(data, self)
case InteractionTypes.MODAL_RESPONSE:
cls = self.modal_context.from_dict(data, self)
case _:
cls = self.interaction_context.from_dict(data, self)
if not cls.channel:
try:
cls.channel = await self.cache.fetch_channel(data["channel_id"])
except Forbidden:
cls.channel = BaseChannel.from_dict_factory(
{"id": data["channel_id"], "type": ChannelTypes.GUILD_TEXT}, self
)
else:
cls = self.prefixed_context.from_message(self, data)
if not cls.channel:
cls.channel = await self.cache.fetch_channel(data._channel_id)
return cls
async def _run_slash_command(self, command: SlashCommand, ctx: InteractionContext) -> Any:
"""Overrideable method that executes slash commands, can be used to wrap callback execution"""
return await command(ctx, **ctx.kwargs)
async def _run_prefixed_command(self, command: PrefixedCommand, ctx: PrefixedContext) -> Any:
"""Overrideable method that executes prefixed commands, can be used to wrap callback execution"""
return await command(ctx)
@processors.Processor.define("raw_interaction_create")
async def _dispatch_interaction(self, event: RawGatewayEvent) -> None:
"""
Identify and dispatch interaction of slash commands or components.
Args:
raw interaction event
"""
interaction_data = event.data
if interaction_data["type"] in (
InteractionTypes.PING,
InteractionTypes.APPLICATION_COMMAND,
InteractionTypes.AUTOCOMPLETE,
):
interaction_id = interaction_data["data"]["id"]
name = interaction_data["data"]["name"]
scope = self._interaction_scopes.get(str(interaction_id))
if scope in self.interactions:
ctx = await self.get_context(interaction_data, True)
ctx.command: SlashCommand = self.interactions[scope][ctx.invoke_target] # type: ignore
logger.debug(f"{scope} :: {ctx.command.name} should be called")
if ctx.command.auto_defer:
auto_defer = ctx.command.auto_defer
elif ctx.command.extension and ctx.command.extension.auto_defer:
auto_defer = ctx.command.extension.auto_defer
else:
auto_defer = self.auto_defer
if auto_opt := getattr(ctx, "focussed_option", None):
try:
await ctx.command.autocomplete_callbacks[auto_opt](ctx, **ctx.kwargs)
except Exception as e:
await self.on_autocomplete_error(ctx, e)
finally:
await self.on_autocomplete(ctx)
else:
try:
await auto_defer(ctx)
if self.pre_run_callback:
await self.pre_run_callback(ctx, **ctx.kwargs)
await self._run_slash_command(ctx.command, ctx)
if self.post_run_callback:
await self.post_run_callback(ctx, **ctx.kwargs)
except Exception as e:
await self.on_command_error(ctx, e)
finally:
await self.on_command(ctx)
else:
logger.error(f"Unknown cmd_id received:: {interaction_id} ({name})")
elif interaction_data["type"] == InteractionTypes.MESSAGE_COMPONENT:
# Buttons, Selects, ContextMenu::Message
ctx = await self.get_context(interaction_data, True)
component_type = interaction_data["data"]["component_type"]
self.dispatch(events.Component(ctx))
if callback := self._component_callbacks.get(ctx.custom_id):
ctx.command = callback
try:
if self.pre_run_callback:
await self.pre_run_callback(ctx)
await callback(ctx)
if self.post_run_callback:
await self.post_run_callback(ctx)
except Exception as e:
await self.on_component_error(ctx, e)
finally:
await self.on_component(ctx)
if component_type == ComponentTypes.BUTTON:
self.dispatch(events.Button(ctx))
if component_type == ComponentTypes.SELECT:
self.dispatch(events.Select(ctx))
elif interaction_data["type"] == InteractionTypes.MODAL_RESPONSE:
ctx = await self.get_context(interaction_data, True)
self.dispatch(events.ModalResponse(ctx))
# todo: Polls remove this icky code duplication - love from past-polls ❤️
if callback := self._modal_callbacks.get(ctx.custom_id):
ctx.command = callback
try:
if self.pre_run_callback:
await self.pre_run_callback(ctx)
await callback(ctx)
if self.post_run_callback:
await self.post_run_callback(ctx)
except Exception as e:
await self.on_component_error(ctx, e)
finally:
await self.on_component(ctx)
else:
raise NotImplementedError(f"Unknown Interaction Received: {interaction_data['type']}")
@Listener.create("message_create")
async def _dispatch_prefixed_commands(self, event: MessageCreate) -> None:
"""Determine if a prefixed command is being triggered, and dispatch it."""
message = event.message
if not message.content:
return
if not message.author.bot:
prefixes: str | Iterable[str] = await self.generate_prefixes(self, message)
if isinstance(prefixes, str) or prefixes == MENTION_PREFIX:
# its easier to treat everything as if it may be an iterable
# rather than building a special case for this
prefixes = (prefixes,) # type: ignore
prefix_used = None
for prefix in prefixes:
if prefix == MENTION_PREFIX:
if mention := self._mention_reg.search(message.content): # type: ignore
prefix = mention.group()
else:
continue
if message.content.startswith(prefix):
prefix_used = prefix
break
if prefix_used:
context = await self.get_context(message)
context.prefix = prefix_used
# interestingly enough, we cannot count on ctx.invoke_target
# being correct as its hard to account for newlines and the like
# with the way we get subcommands here
# we'll have to reconstruct it by getting the content_parameters
# then removing the prefix and the parameters from the message
# content
content_parameters = message.content.removeprefix(prefix_used) # type: ignore
command = self # yes, this is a hack
while True:
first_word: str = get_first_word(content_parameters) # type: ignore
if isinstance(command, PrefixedCommand):
new_command = command.subcommands.get(first_word)
else:
new_command = command.prefixed_commands.get(first_word)
if not new_command or not new_command.enabled:
break
command = new_command
content_parameters = content_parameters.removeprefix(first_word).strip()
if command.subcommands and command.hierarchical_checking:
try:
await new_command._can_run(context) # will error out if we can't run this command
except Exception as e:
if new_command.error_callback:
await new_command.error_callback(e, context)
elif new_command.extension and new_command.extension.extension_error:
await new_command.extension.extension_error(context)
else:
await self.on_command_error(context, e)
return
if not isinstance(command, PrefixedCommand):
command = None
if command and command.enabled:
# yeah, this looks ugly
context.command = command
context.invoke_target = (
message.content.removeprefix(prefix_used).removesuffix(content_parameters).strip() # type: ignore
)
context.args = get_args(context.content_parameters)
try:
if self.pre_run_callback:
await self.pre_run_callback(context)
await self._run_prefixed_command(command, context)
if self.post_run_callback:
await self.post_run_callback(context)
except Exception as e:
await self.on_command_error(context, e)
finally:
await self.on_command(context)
@Listener.create("disconnect")
async def _disconnect(self) -> None:
self._ready.clear()
def get_extensions(self, name: str) -> list[Extension]:
"""
Get all ext with a name or extension name.
Args:
name: The name of the extension, or the name of it's extension
Returns:
List of Extensions
"""
if name not in self.ext.keys():
return [ext for ext in self.ext.values() if ext.extension_name == name]
return [self.ext.get(name, None)]
def get_ext(self, name: str) -> Extension | None:
"""
Get a extension with a name or extension name.
Args:
name: The name of the extension, or the name of it's extension
Returns:
A extension, if found
"""
if ext := self.get_extensions(name):
return ext[0]
return None
def load_extension(self, name: str, package: str = None, **load_kwargs) -> None:
"""
Load an extension with given arguments.
Args:
name: The name of the extension.
package: The package the extension is in
load_kwargs: The auto-filled mapping of the load keyword arguments
"""
name = importlib.util.resolve_name(name, package)
if name in self.__modules:
raise Exception(f"{name} already loaded")
module = importlib.import_module(name, package)
try:
setup = getattr(module, "setup", None)
if not setup:
raise ExtensionLoadException(
f"{name} lacks an entry point. Ensure you have a function called `setup` defined in that file"
) from None
setup(self, **load_kwargs)
except ExtensionLoadException:
raise
except Exception as e:
del sys.modules[name]
raise ExtensionLoadException(f"Unexpected Error loading {name}") from e
else:
logger.debug(f"Loaded Extension: {name}")
self.__modules[name] = module
if self.sync_ext and self._ready.is_set():
try:
asyncio.get_running_loop()
except RuntimeError:
return
asyncio.create_task(self.synchronise_interactions())
def unload_extension(self, name, package=None, **unload_kwargs) -> None:
"""
Unload an extension with given arguments.
Args:
name: The name of the extension.
package: The package the extension is in
unload_kwargs: The auto-filled mapping of the unload keyword arguments
"""
name = importlib.util.resolve_name(name, package)
module = self.__modules.get(name)
if module is None:
raise ExtensionNotFound(f"No extension called {name} is loaded")
try:
teardown = getattr(module, "teardown")
teardown(**unload_kwargs)
except AttributeError:
pass
for ext in self.get_extensions(name):
ext.drop(**unload_kwargs)
del sys.modules[name]
del self.__modules[name]
if self.sync_ext and self._ready.is_set():
if self.sync_ext and self._ready.is_set():
try:
asyncio.get_running_loop()
except RuntimeError:
return
asyncio.create_task(self.synchronise_interactions())
def reload_extension(
self, name, package=None, *, load_kwargs: Mapping[str, Any] = None, unload_kwargs: Mapping[str, Any] = None
) -> None:
"""
Helper method to reload an extension. Simply unloads, then loads the extension with given arguments.
Args:
name: The name of the extension.
package: The package the extension is in
load_kwargs: The manually-filled mapping of the load keyword arguments
unload_kwargs: The manually-filled mapping of the unload keyword arguments
"""
name = importlib.util.resolve_name(name, package)
module = self.__modules.get(name)
if module is None:
logger.warning("Attempted to reload extension thats not loaded. Loading extension instead")
return self.load_extension(name, package)
if not load_kwargs:
load_kwargs = {}
if not unload_kwargs:
unload_kwargs = {}
self.unload_extension(name, package, **unload_kwargs)
self.load_extension(name, package, **load_kwargs)
# todo: maybe add an ability to revert to the previous version if unable to load the new one
async def fetch_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
"""
Fetch a guild.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
guild_id: The ID of the guild to get
Returns:
Guild Object if found, otherwise None
"""
try:
return await self.cache.fetch_guild(guild_id)
except NotFound:
return None
def get_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
"""
Get a guild.
Note:
This method is an alias for the cache which will return a cached object.
Args:
guild_id: The ID of the guild to get
Returns:
Guild Object if found, otherwise None
"""
return self.cache.get_guild(guild_id)
async def create_guild_from_template(
self,
template_code: Union["GuildTemplate", str],
name: str,
icon: Absent[UPLOADABLE_TYPE] = MISSING,
) -> Optional[Guild]:
"""
Creates a new guild based on a template.
note:
This endpoint can only be used by bots in less than 10 guilds.
Args:
template_code: The code of the template to use.
name: The name of the guild (2-100 characters)
icon: Location or File of icon to set
Returns:
The newly created guild object
"""
if isinstance(template_code, GuildTemplate):
template_code = template_code.code
if icon:
icon = to_image_data(icon)
guild_data = await self.http.create_guild_from_guild_template(template_code, name, icon)
return Guild.from_dict(guild_data, self)
async def fetch_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
"""
Fetch a channel.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
channel_id: The ID of the channel to get
Returns:
Channel Object if found, otherwise None
"""
try:
return await self.cache.fetch_channel(channel_id)
except NotFound:
return None
def get_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
"""
Get a channel.
Note:
This method is an alias for the cache which will return a cached object.
Args:
channel_id: The ID of the channel to get
Returns:
Channel Object if found, otherwise None
"""
return self.cache.get_channel(channel_id)
async def fetch_user(self, user_id: "Snowflake_Type") -> Optional[User]:
"""
Fetch a user.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
user_id: The ID of the user to get
Returns:
User Object if found, otherwise None
"""
try:
return await self.cache.fetch_user(user_id)
except NotFound:
return None
def get_user(self, user_id: "Snowflake_Type") -> Optional[User]:
"""
Get a user.
Note:
This method is an alias for the cache which will return a cached object.
Args:
user_id: The ID of the user to get
Returns:
User Object if found, otherwise None
"""
return self.cache.get_user(user_id)
async def fetch_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
"""
Fetch a member from a guild.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
user_id: The ID of the member
guild_id: The ID of the guild to get the member from
Returns:
Member object if found, otherwise None
"""
try:
return await self.cache.fetch_member(guild_id, user_id)
except NotFound:
return None
def get_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
"""
Get a member from a guild.
Note:
This method is an alias for the cache which will return a cached object.
Args:
user_id: The ID of the member
guild_id: The ID of the guild to get the member from
Returns:
Member object if found, otherwise None
"""
return self.cache.get_member(guild_id, user_id)
async def fetch_scheduled_event(
self, guild_id: "Snowflake_Type", scheduled_event_id: "Snowflake_Type", with_user_count: bool = False
) -> Optional["ScheduledEvent"]:
"""
Fetch a scheduled event by id.
Args:
event_id: The id of the scheduled event.
Returns:
The scheduled event if found, otherwise None
"""
try:
scheduled_event_data = await self.http.get_scheduled_event(guild_id, scheduled_event_id, with_user_count)
return ScheduledEvent.from_dict(scheduled_event_data, self)
except NotFound:
return None
async def fetch_custom_emoji(self, emoji_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[CustomEmoji]:
"""
Fetch a custom emoji by id.
Args:
emoji_id: The id of the custom emoji.
guild_id: The id of the guild the emoji belongs to.
Returns:
The custom emoji if found, otherwise None.
"""
try:
return await self.cache.fetch_emoji(guild_id, emoji_id)
except NotFound:
return None
def get_custom_emoji(
self, emoji_id: "Snowflake_Type", guild_id: Optional["Snowflake_Type"] = None
) -> Optional[CustomEmoji]:
"""
Get a custom emoji by id.
Args:
emoji_id: The id of the custom emoji.
guild_id: The id of the guild the emoji belongs to.
Returns:
The custom emoji if found, otherwise None.
"""
emoji = self.cache.get_emoji(emoji_id)
if emoji and (not guild_id or emoji._guild_id == to_snowflake(guild_id)):
return emoji
return None
async def fetch_sticker(self, sticker_id: "Snowflake_Type") -> Optional[Sticker]:
"""
Fetch a sticker by ID.
Args:
sticker_id: The ID of the sticker.
Returns:
A sticker object if found, otherwise None
"""
try:
sticker_data = await self.http.get_sticker(sticker_id)
return Sticker.from_dict(sticker_data, self)
except NotFound:
return None
async def fetch_nitro_packs(self) -> Optional[List["StickerPack"]]:
"""
List the sticker packs available to Nitro subscribers.
Returns:
A list of StickerPack objects if found, otherwise returns None
"""
try:
packs_data = await self.http.list_nitro_sticker_packs()
return [StickerPack.from_dict(data, self) for data in packs_data]
except NotFound:
return None
async def fetch_voice_regions(self) -> List["VoiceRegion"]:
"""
List the voice regions available on Discord.
Returns:
A list of voice regions.
"""
regions_data = await self.http.list_voice_regions()
regions = VoiceRegion.from_list(regions_data)
return regions
async def connect_to_vc(
self, guild_id: "Snowflake_Type", channel_id: "Snowflake_Type", muted: bool = False, deafened: bool = False
) -> ActiveVoiceState:
"""
Connect the bot to a voice channel.
Args:
guild_id: id of the guild the voice channel is in.
channel_id: id of the voice channel client wants to join.
muted: Whether the bot should be muted when connected.
deafened: Whether the bot should be deafened when connected.
Returns:
The new active voice state on successfully connection.
"""
return await self._connection_state.voice_connect(guild_id, channel_id, muted, deafened)
def get_bot_voice_state(self, guild_id: "Snowflake_Type") -> Optional[ActiveVoiceState]:
"""
Get the bot's voice state for a guild.
Args:
guild_id: The target guild's id.
Returns:
The bot's voice state for the guild if connected, otherwise None.
"""
return self._connection_state.get_voice_state(guild_id)
async def change_presence(
self, status: Optional[Union[str, Status]] = Status.ONLINE, activity: Optional[Union[Activity, str]] = None
) -> None:
"""
Change the bots presence.
Args:
status: The status for the bot to be. i.e. online, afk, etc.
activity: The activity for the bot to be displayed as doing.
Note::
Bots may only be `playing` `streaming` `listening` `watching` or `competing`, other activity types are likely to fail.
"""
await self._connection_state.change_presence(status, activity)
property
readonly
is_closed: bool
¶
Returns True if the bot has closed.
property
readonly
is_ready: bool
¶
Returns True if the bot is ready.
property
readonly
latency: float
¶
Returns the latency of the websocket connection.
property
readonly
average_latency: float
¶
Returns the average latency of the websocket connection.
property
readonly
start_time: datetime
¶
The start time of the bot.
property
readonly
gateway_started: bool
¶
Returns if the gateway has been started.
property
readonly
user: NaffUser
¶
Returns the bot's user.
property
readonly
app: Application
¶
Returns the bots application.
property
readonly
owner: Optional[User]
¶
Returns the bot's owner'.
property
readonly
owners: List[User]
¶
Returns the bot's owners as declared via client.owner_ids
.
property
readonly
guilds: List[Guild]
¶
Returns a list of all guilds the bot is in.
property
readonly
status: Status
¶
Get the status of the bot.
IE online, afk, dnd
property
readonly
activity: Activity
¶
Get the activity of the bot.
property
readonly
application_commands: List[naff.models.naff.application_commands.InteractionCommand]
¶
A list of all application commands registered within the bot.
property
readonly
ws: GatewayClient
¶
Returns the websocket client.
async
method
generate_prefixes(self, bot, message)
¶
A method to get the bot's default_prefix, can be overridden to add dynamic prefixes.
Note
To easily override this method, simply use the generate_prefixes
parameter when instantiating the client
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bot |
Client |
A reference to the client |
required |
message |
Message |
A message to determine the prefix from. |
required |
Returns:
Type | Description |
---|---|
str | collections.abc.Iterable[str] |
A string or an iterable of strings to use as a prefix. By default, this will return |
Source code in naff/client/client.py
async def generate_prefixes(self, bot: "Client", message: Message) -> str | Iterable[str]:
"""
A method to get the bot's default_prefix, can be overridden to add dynamic prefixes.
!!! note
To easily override this method, simply use the `generate_prefixes` parameter when instantiating the client
Args:
bot: A reference to the client
message: A message to determine the prefix from.
Returns:
A string or an iterable of strings to use as a prefix. By default, this will return `client.default_prefix`
"""
return self.default_prefix
staticmethod
method
default_error_handler(source, error)
¶
The default error logging behaviour.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
The source of this error |
required |
error |
BaseException |
The exception itself |
required |
Source code in naff/client/client.py
@staticmethod
def default_error_handler(source: str, error: BaseException) -> None:
"""
The default error logging behaviour.
Args:
source: The source of this error
error: The exception itself
"""
out = traceback.format_exception(error)
if isinstance(error, HTTPException):
# HTTPException's are of 3 known formats, we can parse them for human readable errors
try:
errors = error.search_for_message(error.errors)
out = f"HTTPException: {error.status}|{error.response.reason}: " + "\n".join(errors)
except Exception: # noqa : S110
pass
logger.error(
"Ignoring exception in {}:{}{}".format(source, "\n" if len(out) > 1 else " ", "".join(out)),
)
async
method
on_error(self, source, error, *args, **kwargs)
¶
Catches all errors dispatched by the library.
By default it will format and print them to console
Override this to change error handling behaviour
Source code in naff/client/client.py
async def on_error(self, source: str, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by the library.
By default it will format and print them to console
Override this to change error handling behaviour
"""
self.default_error_handler(source, error)
async
method
on_command_error(self, ctx, error, *args, **kwargs)
¶
Catches all errors dispatched by commands.
By default it will call Client.on_error
Override this to change error handling behavior
Source code in naff/client/client.py
async def on_command_error(self, ctx: Context, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by commands.
By default it will call `Client.on_error`
Override this to change error handling behavior
"""
self.dispatch(events.Error(f"cmd /`{ctx.invoke_target}`", error, args, kwargs, ctx))
try:
if isinstance(error, errors.CommandOnCooldown):
await ctx.send(
embeds=Embed(
description=f"This command is on cooldown!\n"
f"Please try again in {int(error.cooldown.get_cooldown_time())} seconds",
color=BrandColors.FUCHSIA,
)
)
elif isinstance(error, errors.MaxConcurrencyReached):
await ctx.send(
embeds=Embed(
description="This command has reached its maximum concurrent usage!\n"
"Please try again shortly.",
color=BrandColors.FUCHSIA,
)
)
elif isinstance(error, errors.CommandCheckFailure):
await ctx.send(
embeds=Embed(
description="You do not have permission to run this command!",
color=BrandColors.YELLOW,
)
)
elif self.send_command_tracebacks:
out = "".join(traceback.format_exception(error))
if self.http.token is not None:
out = out.replace(self.http.token, "[REDACTED TOKEN]")
await ctx.send(
embeds=Embed(
title=f"Error: {type(error).__name__}",
color=BrandColors.RED,
description=f"```\n{out[:EMBED_MAX_DESC_LENGTH-8]}```",
)
)
except errors.NaffException:
pass
async
method
on_command(self, ctx)
¶
Called after any command is ran.
By default, it will simply log the command, override this to change that behaviour
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ctx |
Context |
The context of the command that was called |
required |
Source code in naff/client/client.py
async def on_command(self, ctx: Context) -> None:
"""
Called *after* any command is ran.
By default, it will simply log the command, override this to change that behaviour
Args:
ctx: The context of the command that was called
"""
if isinstance(ctx, PrefixedContext):
symbol = "@"
elif isinstance(ctx, InteractionContext):
symbol = "/"
else:
symbol = "?" # likely custom context
logger.info(f"Command Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")
async
method
on_component_error(self, ctx, error, *args, **kwargs)
¶
Catches all errors dispatched by components.
By default it will call Naff.on_error
Override this to change error handling behavior
Source code in naff/client/client.py
async def on_component_error(self, ctx: ComponentContext, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by components.
By default it will call `Naff.on_error`
Override this to change error handling behavior
"""
return self.dispatch(events.Error(f"Component Callback for {ctx.custom_id}", error, args, kwargs, ctx))
async
method
on_component(self, ctx)
¶
Called after any component callback is ran.
By default, it will simply log the component use, override this to change that behaviour
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ctx |
ComponentContext |
The context of the component that was called |
required |
Source code in naff/client/client.py
async def on_component(self, ctx: ComponentContext) -> None:
"""
Called *after* any component callback is ran.
By default, it will simply log the component use, override this to change that behaviour
Args:
ctx: The context of the component that was called
"""
symbol = "¢"
logger.info(f"Component Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")
async
method
on_autocomplete_error(self, ctx, error, *args, **kwargs)
¶
Catches all errors dispatched by autocompletion options.
By default it will call Naff.on_error
Override this to change error handling behavior
Source code in naff/client/client.py
async def on_autocomplete_error(self, ctx: AutocompleteContext, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by autocompletion options.
By default it will call `Naff.on_error`
Override this to change error handling behavior
"""
return self.dispatch(
events.Error(
f"Autocomplete Callback for /{ctx.invoke_target} - Option: {ctx.focussed_option}",
error,
args,
kwargs,
ctx,
)
)
async
method
on_autocomplete(self, ctx)
¶
Called after any autocomplete callback is ran.
By default, it will simply log the autocomplete callback, override this to change that behaviour
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ctx |
AutocompleteContext |
The context of the command that was called |
required |
Source code in naff/client/client.py
async def on_autocomplete(self, ctx: AutocompleteContext) -> None:
"""
Called *after* any autocomplete callback is ran.
By default, it will simply log the autocomplete callback, override this to change that behaviour
Args:
ctx: The context of the command that was called
"""
symbol = "$"
logger.info(f"Autocomplete Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")
async
method
login(self, token)
¶
Login to discord via http.
Note
You will need to run Naff.start_gateway() before you start receiving gateway events.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
str |
Your bot's token |
required |
Source code in naff/client/client.py
async def login(self, token) -> None:
"""
Login to discord via http.
!!! note
You will need to run Naff.start_gateway() before you start receiving gateway events.
Args:
token str: Your bot's token
"""
# i needed somewhere to put this call,
# login will always run after initialisation
# so im gathering commands here
self._gather_commands()
logger.debug("Attempting to login")
me = await self.http.login(token.strip())
self._user = NaffUser.from_dict(me, self)
self.cache.place_user_data(me)
self._app = Application.from_dict(await self.http.get_current_bot_information(), self)
self._mention_reg = re.compile(rf"^(<@!?{self.user.id}*>\s)")
if self.app.owner:
self.owner_ids.add(self.app.owner.id)
self.dispatch(events.Login())
async
method
astart(self, token)
¶
Asynchronous method to start the bot.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
Your bot's token |
required |
Source code in naff/client/client.py
async def astart(self, token) -> None:
"""
Asynchronous method to start the bot.
Args:
token: Your bot's token
Returns:
"""
await self.login(token)
try:
await self._connection_state.start()
finally:
await self.stop()
method
start(self, token)
¶
Start the bot.
Info
This is the recommended method to start the bot
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
Your bot's token |
required |
Source code in naff/client/client.py
def start(self, token) -> None:
"""
Start the bot.
info:
This is the recommended method to start the bot
Args:
token: Your bot's token
"""
try:
asyncio.run(self.astart(token))
except KeyboardInterrupt:
# ignore, cus this is useless and can be misleading to the
# user
pass
async
method
start_gateway(self)
¶
Starts the gateway connection.
Source code in naff/client/client.py
async def start_gateway(self) -> None:
"""Starts the gateway connection."""
try:
await self._connection_state.start()
finally:
await self.stop()
async
method
stop(self)
¶
Shutdown the bot.
Source code in naff/client/client.py
async def stop(self) -> None:
"""Shutdown the bot."""
logger.debug("Stopping the bot.")
self._ready.clear()
await self.http.close()
await self._connection_state.stop()
method
dispatch(self, event, *args, **kwargs)
¶
Dispatch an event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
BaseEvent |
The event to be dispatched. |
required |
Source code in naff/client/client.py
def dispatch(self, event: events.BaseEvent, *args, **kwargs) -> None:
"""
Dispatch an event.
Args:
event: The event to be dispatched.
"""
listeners = self.listeners.get(event.resolved_name, [])
if listeners:
logger.debug(f"Dispatching Event: {event.resolved_name}")
event.bot = self
for _listen in listeners:
try:
self._queue_task(_listen, event, *args, **kwargs)
except Exception as e:
raise BotException(
f"An error occurred attempting during {event.resolved_name} event processing"
) from e
_waits = self.waits.get(event.resolved_name, [])
if _waits:
index_to_remove = []
for i, _wait in enumerate(_waits):
result = _wait(event)
if result:
index_to_remove.append(i)
for idx in sorted(index_to_remove, reverse=True):
_waits.pop(idx)
async
method
wait_until_ready(self)
¶
Waits for the client to become ready.
Source code in naff/client/client.py
async def wait_until_ready(self) -> None:
"""Waits for the client to become ready."""
await self._ready.wait()
method
wait_for(self, event, checks, timeout)
¶
Waits for a WebSocket event to be dispatched.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Union[str, BaseEvent] |
The name of event to wait. |
required |
checks |
Union[Callable[..., bool], NoneType, naff.client.const.Missing] |
A predicate to check what to wait for. |
Missing |
timeout |
Optional[float] |
The number of seconds to wait before timing out. |
None |
Returns:
Type | Description |
---|---|
Any |
The event object. |
Source code in naff/client/client.py
def wait_for(
self,
event: Union[str, "BaseEvent"],
checks: Absent[Optional[Callable[..., bool]]] = MISSING,
timeout: Optional[float] = None,
) -> Any:
"""
Waits for a WebSocket event to be dispatched.
Args:
event: The name of event to wait.
checks: A predicate to check what to wait for.
timeout: The number of seconds to wait before timing out.
Returns:
The event object.
"""
event = get_event_name(event)
if event not in self.waits:
self.waits[event] = []
future = asyncio.Future()
self.waits[event].append(Wait(event, checks, future))
return asyncio.wait_for(future, timeout)
async
method
wait_for_modal(self, modal, author, timeout)
¶
Wait for a modal response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
modal |
Modal |
The modal we're waiting for. |
required |
author |
Optional[Snowflake_Type] |
The user we're waiting for to reply |
None |
timeout |
Optional[float] |
A timeout in seconds to stop waiting |
None |
Returns:
Type | Description |
---|---|
ModalContext |
The context of the modal response |
Source code in naff/client/client.py
async def wait_for_modal(
self,
modal: "Modal",
author: Optional["Snowflake_Type"] = None,
timeout: Optional[float] = None,
) -> ModalContext:
"""
Wait for a modal response.
Args:
modal: The modal we're waiting for.
author: The user we're waiting for to reply
timeout: A timeout in seconds to stop waiting
Returns:
The context of the modal response
Raises:
`asyncio.TimeoutError` if no response is received that satisfies the predicate before timeout seconds have passed
"""
author = to_snowflake(author) if author else None
def predicate(event) -> bool:
if modal.custom_id != event.context.custom_id:
return False
if author and author != to_snowflake(event.context.author):
return False
return True
resp = await self.wait_for("modal_response", predicate, timeout)
return resp.context
async
method
wait_for_component(self, messages, components, check, timeout)
¶
Waits for a component to be sent to the bot.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages |
Union[naff.models.discord.message.Message, int, list] |
The message object to check for. |
None |
components |
Union[List[List[Union[BaseComponent, dict]]], List[Union[BaseComponent, dict]], BaseComponent, dict] |
The components to wait for. |
None |
check |
Optional[Callable] |
A predicate to check what to wait for. |
None |
timeout |
Optional[float] |
The number of seconds to wait before timing out. |
None |
Returns:
Type | Description |
---|---|
Component |
|
Source code in naff/client/client.py
async def wait_for_component(
self,
messages: Union[Message, int, list] = None,
components: Optional[
Union[List[List[Union["BaseComponent", dict]]], List[Union["BaseComponent", dict]], "BaseComponent", dict]
] = None,
check: Optional[Callable] = None,
timeout: Optional[float] = None,
) -> "Component":
"""
Waits for a component to be sent to the bot.
Args:
messages: The message object to check for.
components: The components to wait for.
check: A predicate to check what to wait for.
timeout: The number of seconds to wait before timing out.
Returns:
`Component` that was invoked. Use `.context` to get the `ComponentContext`.
Raises:
`asyncio.TimeoutError` if timed out
"""
if not (messages or components):
raise ValueError("You must specify messages or components (or both)")
message_ids = (
to_snowflake_list(messages) if isinstance(messages, list) else to_snowflake(messages) if messages else None
)
custom_ids = list(get_components_ids(components)) if components else None
# automatically convert improper custom_ids
if custom_ids and not all(isinstance(x, str) for x in custom_ids):
custom_ids = [str(i) for i in custom_ids]
def _check(event: Component) -> bool:
ctx: ComponentContext = event.context
# if custom_ids is empty or there is a match
wanted_message = not message_ids or ctx.message.id in (
[message_ids] if isinstance(message_ids, int) else message_ids
)
wanted_component = not custom_ids or ctx.custom_id in custom_ids
if wanted_message and wanted_component:
if check is None or check(event):
return True
return False
return False
return await self.wait_for("component", checks=_check, timeout=timeout)
method
listen(self, event_name)
¶
A decorator to be used in situations that Naff can't automatically hook your listeners. Ideally, the standard listen decorator should be used, not this.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_name |
Union[str, naff.client.const.Missing] |
The event name to use, if not the coroutine name |
Missing |
Returns:
Type | Description |
---|---|
Listener |
A listener that can be used to hook into the event. |
Source code in naff/client/client.py
def listen(self, event_name: Absent[str] = MISSING) -> Listener:
"""
A decorator to be used in situations that Naff can't automatically hook your listeners. Ideally, the standard listen decorator should be used, not this.
Args:
event_name: The event name to use, if not the coroutine name
Returns:
A listener that can be used to hook into the event.
"""
def wrapper(coro: Callable[..., Coroutine]) -> Listener:
listener = Listener.create(event_name)(coro)
self.add_listener(listener)
return listener
return wrapper
method
add_event_processor(self, event_name)
¶
A decorator to be used to add event processors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_name |
Union[str, naff.client.const.Missing] |
The event name to use, if not the coroutine name |
Missing |
Returns:
Type | Description |
---|---|
Callable[..., Coroutine] |
A function that can be used to hook into the event. |
Source code in naff/client/client.py
def add_event_processor(self, event_name: Absent[str] = MISSING) -> Callable[..., Coroutine]:
"""
A decorator to be used to add event processors.
Args:
event_name: The event name to use, if not the coroutine name
Returns:
A function that can be used to hook into the event.
"""
def wrapper(coro: Callable[..., Coroutine]) -> Callable[..., Coroutine]:
name = event_name
if name is MISSING:
name = coro.__name__
name = name.lstrip("_")
name = name.removeprefix("on_")
self.processors[name] = coro
return coro
return wrapper
method
add_listener(self, listener)
¶
Add a listener for an event, if no event is passed, one is determined.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
listener |
Listener |
The listener to add to the client |
required |
Source code in naff/client/client.py
def add_listener(self, listener: Listener) -> None:
"""
Add a listener for an event, if no event is passed, one is determined.
Args:
listener Listener: The listener to add to the client
"""
# check that the required intents are enabled
event_class_name = "".join([name.capitalize() for name in listener.event.split("_")])
if event_class := globals().get(event_class_name):
if required_intents := _INTENT_EVENTS.get(event_class): # noqa
if not any(required_intent in self.intents for required_intent in required_intents):
self.logger.warning(
f"Event `{listener.event}` will not work since the required intent is not set -> Requires any of: `{required_intents}`"
)
if listener.event not in self.listeners:
self.listeners[listener.event] = []
self.listeners[listener.event].append(listener)
method
add_interaction(self, command)
¶
Add a slash command to the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
InteractionCommand |
The command to add |
required |
Source code in naff/client/client.py
def add_interaction(self, command: InteractionCommand) -> bool:
"""
Add a slash command to the client.
Args:
command InteractionCommand: The command to add
"""
if self.debug_scope:
command.scopes = [self.debug_scope]
# for SlashCommand objs without callback (like objects made to hold group info etc)
if command.callback is None:
return False
for scope in command.scopes:
if scope not in self.interactions:
self.interactions[scope] = {}
elif command.resolved_name in self.interactions[scope]:
old_cmd = self.interactions[scope][command.resolved_name]
raise ValueError(f"Duplicate Command! {scope}::{old_cmd.resolved_name}")
if self.enforce_interaction_perms:
command.checks.append(command._permission_enforcer) # noqa : w0212
self.interactions[scope][command.resolved_name] = command
return True
method
add_prefixed_command(self, command)
¶
Add a prefixed command to the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
PrefixedCommand |
The command to add |
required |
Source code in naff/client/client.py
def add_prefixed_command(self, command: PrefixedCommand) -> None:
"""
Add a prefixed command to the client.
Args:
command PrefixedCommand: The command to add
"""
# check that the required intent is enabled or the prefix is a mention
prefixes = (
self.default_prefix
if not isinstance(self.default_prefix, str) and not self.default_prefix == MENTION_PREFIX
else (self.default_prefix,)
)
if (MENTION_PREFIX not in prefixes) and (Intents.GUILD_MESSAGE_CONTENT not in self.intents):
self.logger.warning(
f"Prefixed commands will not work since the required intent is not set -> Requires: `{Intents.GUILD_MESSAGE_CONTENT.__repr__()}` or usage of the default `MENTION_PREFIX` as the prefix"
)
command._parse_parameters()
if self.prefixed_commands.get(command.name):
raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {command.name}.")
self.prefixed_commands[command.name] = command
for alias in command.aliases:
if self.prefixed_commands.get(alias):
raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {alias}.")
self.prefixed_commands[alias] = command
method
add_component_callback(self, command)
¶
Add a component callback to the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
ComponentCommand |
The command to add |
required |
Source code in naff/client/client.py
def add_component_callback(self, command: ComponentCommand) -> None:
"""
Add a component callback to the client.
Args:
command: The command to add
"""
for listener in command.listeners:
# I know this isn't an ideal solution, but it means we can lookup callbacks with O(1)
if listener not in self._component_callbacks.keys():
self._component_callbacks[listener] = command
continue
else:
raise ValueError(f"Duplicate Component! Multiple component callbacks for `{listener}`")
method
add_modal_callback(self, command)
¶
Add a modal callback to the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
ModalCommand |
The command to add |
required |
Source code in naff/client/client.py
def add_modal_callback(self, command: ModalCommand) -> None:
"""
Add a modal callback to the client.
Args:
command: The command to add
"""
for listener in command.listeners:
if listener not in self._modal_callbacks.keys():
self._modal_callbacks[listener] = command
continue
else:
raise ValueError(f"Duplicate Component! Multiple modal callbacks for `{listener}`")
async
method
synchronise_interactions(self, *, scopes, delete_commands)
¶
Synchronise registered interactions with discord.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
scopes |
list |
Optionally specify which scopes are to be synced |
Missing |
delete_commands |
Union[bool, naff.client.const.Missing] |
Override the client setting and delete commands |
Missing |
Source code in naff/client/client.py
async def synchronise_interactions(
self, *, scopes: list["Snowflake_Type"] = MISSING, delete_commands: Absent[bool] = MISSING
) -> None:
"""
Synchronise registered interactions with discord.
Args:
scopes: Optionally specify which scopes are to be synced
delete_commands: Override the client setting and delete commands
"""
s = time.perf_counter()
_delete_cmds = self.del_unused_app_cmd if delete_commands is MISSING else delete_commands
await self._cache_interactions()
if scopes is not MISSING:
cmd_scopes = scopes
elif self.del_unused_app_cmd:
# if we're deleting unused commands, we check all scopes
cmd_scopes = [to_snowflake(g_id) for g_id in self._user._guild_ids] + [GLOBAL_SCOPE]
else:
# if we're not deleting, just check the scopes we have cmds registered in
cmd_scopes = list(set(self.interactions) | {GLOBAL_SCOPE})
local_cmds_json = application_commands_to_dict(self.interactions)
async def sync_scope(cmd_scope) -> None:
sync_needed_flag = False # a flag to force this scope to synchronise
sync_payload = [] # the payload to be pushed to discord
try:
try:
remote_commands = await self.http.get_application_commands(self.app.id, cmd_scope)
except Forbidden:
logger.warning(f"Bot is lacking `application.commands` scope in {cmd_scope}!")
return
for local_cmd in self.interactions.get(cmd_scope, {}).values():
# get remote equivalent of this command
remote_cmd_json = next(
(v for v in remote_commands if int(v["id"]) == local_cmd.cmd_id.get(cmd_scope)), None
)
# get json representation of this command
local_cmd_json = next((c for c in local_cmds_json[cmd_scope] if c["name"] == str(local_cmd.name)))
# this works by adding any command we *want* on Discord, to a payload, and synchronising that
# this allows us to delete unused commands, add new commands, or do nothing in 1 or less API calls
if sync_needed(local_cmd_json, remote_cmd_json):
# determine if the local and remote commands are out-of-sync
sync_needed_flag = True
sync_payload.append(local_cmd_json)
elif not _delete_cmds and remote_cmd_json:
_remote_payload = {
k: v for k, v in remote_cmd_json.items() if k not in ("id", "application_id", "version")
}
sync_payload.append(_remote_payload)
elif _delete_cmds:
sync_payload.append(local_cmd_json)
sync_payload = [json.loads(_dump) for _dump in {json.dumps(_cmd) for _cmd in sync_payload}]
if sync_needed_flag or (_delete_cmds and len(sync_payload) < len(remote_commands)):
# synchronise commands if flag is set, or commands are to be deleted
logger.info(f"Overwriting {cmd_scope} with {len(sync_payload)} application commands")
sync_response: list[dict] = await self.http.overwrite_application_commands(
self.app.id, sync_payload, cmd_scope
)
self._cache_sync_response(sync_response, cmd_scope)
else:
logger.debug(f"{cmd_scope} is already up-to-date with {len(remote_commands)} commands.")
except Forbidden as e:
raise InteractionMissingAccess(cmd_scope) from e
except HTTPException as e:
self._raise_sync_exception(e, local_cmds_json, cmd_scope)
await asyncio.gather(*[sync_scope(scope) for scope in cmd_scopes])
t = time.perf_counter() - s
logger.debug(f"Sync of {len(cmd_scopes)} scopes took {t} seconds")
method
get_application_cmd_by_id(self, cmd_id)
¶
Get a application command from the internal cache by its ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cmd_id |
Snowflake_Type |
The ID of the command |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.naff.application_commands.InteractionCommand] |
The command, if one with the given ID exists internally, otherwise None |
Source code in naff/client/client.py
def get_application_cmd_by_id(self, cmd_id: "Snowflake_Type") -> Optional[InteractionCommand]:
"""
Get a application command from the internal cache by its ID.
Args:
cmd_id: The ID of the command
Returns:
The command, if one with the given ID exists internally, otherwise None
"""
scope = self._interaction_scopes.get(str(cmd_id), MISSING)
cmd_id = int(cmd_id) # ensure int ID
if scope != MISSING:
for cmd in self.interactions[scope].values():
if int(cmd.cmd_id.get(scope)) == cmd_id:
return cmd
return None
async
method
get_context(self, data, interaction)
¶
Return a context object based on data passed.
Note
If you want to use custom context objects, this is the method to override. Your replacement must take the same arguments as this, and return a Context-like object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[discord_typings.interactions.receiving.ApplicationCommandGuildInteractionData, discord_typings.interactions.receiving.ApplicationCommandChannelInteractionData, discord_typings.interactions.receiving.GuildUserCommandInteractionData, discord_typings.interactions.receiving.ChannelUserCommandInteractionData, discord_typings.interactions.receiving.ComponentGuildInteractionData, discord_typings.interactions.receiving.ComponentChannelInteractionData, discord_typings.interactions.receiving.AutocompleteGuildInteractionData, discord_typings.interactions.receiving.AutocompleteChannelInteractionData, discord_typings.interactions.receiving.ModalGuildInteractionData, discord_typings.interactions.receiving.ModalChannelInteractionData, dict, naff.models.discord.message.Message] |
The data of the event |
required |
interaction |
bool |
Is this an interaction or not? |
False |
Returns:
Type | Description |
---|---|
naff.models.naff.context.ComponentContext | naff.models.naff.context.AutocompleteContext | naff.models.naff.context.ModalContext | naff.models.naff.context.InteractionContext | naff.models.naff.context.PrefixedContext |
Context object |
Source code in naff/client/client.py
async def get_context(
self, data: InteractionData | dict | Message, interaction: bool = False
) -> ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext:
"""
Return a context object based on data passed.
note:
If you want to use custom context objects, this is the method to override. Your replacement must take the same arguments as this, and return a Context-like object.
Args:
data: The data of the event
interaction: Is this an interaction or not?
Returns:
Context object
"""
# this line shuts up IDE warnings
cls: ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext
if interaction:
match data["type"]:
case InteractionTypes.MESSAGE_COMPONENT:
cls = self.component_context.from_dict(data, self)
case InteractionTypes.AUTOCOMPLETE:
cls = self.autocomplete_context.from_dict(data, self)
case InteractionTypes.MODAL_RESPONSE:
cls = self.modal_context.from_dict(data, self)
case _:
cls = self.interaction_context.from_dict(data, self)
if not cls.channel:
try:
cls.channel = await self.cache.fetch_channel(data["channel_id"])
except Forbidden:
cls.channel = BaseChannel.from_dict_factory(
{"id": data["channel_id"], "type": ChannelTypes.GUILD_TEXT}, self
)
else:
cls = self.prefixed_context.from_message(self, data)
if not cls.channel:
cls.channel = await self.cache.fetch_channel(data._channel_id)
return cls
method
get_extensions(self, name)
¶
Get all ext with a name or extension name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the extension, or the name of it's extension |
required |
Returns:
Type | Description |
---|---|
list |
List of Extensions |
Source code in naff/client/client.py
def get_extensions(self, name: str) -> list[Extension]:
"""
Get all ext with a name or extension name.
Args:
name: The name of the extension, or the name of it's extension
Returns:
List of Extensions
"""
if name not in self.ext.keys():
return [ext for ext in self.ext.values() if ext.extension_name == name]
return [self.ext.get(name, None)]
method
get_ext(self, name)
¶
Get a extension with a name or extension name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the extension, or the name of it's extension |
required |
Returns:
Type | Description |
---|---|
naff.models.naff.extension.Extension | None |
A extension, if found |
Source code in naff/client/client.py
def get_ext(self, name: str) -> Extension | None:
"""
Get a extension with a name or extension name.
Args:
name: The name of the extension, or the name of it's extension
Returns:
A extension, if found
"""
if ext := self.get_extensions(name):
return ext[0]
return None
method
load_extension(self, name, package, **load_kwargs)
¶
Load an extension with given arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the extension. |
required |
package |
str |
The package the extension is in |
None |
load_kwargs |
The auto-filled mapping of the load keyword arguments |
{} |
Source code in naff/client/client.py
def load_extension(self, name: str, package: str = None, **load_kwargs) -> None:
"""
Load an extension with given arguments.
Args:
name: The name of the extension.
package: The package the extension is in
load_kwargs: The auto-filled mapping of the load keyword arguments
"""
name = importlib.util.resolve_name(name, package)
if name in self.__modules:
raise Exception(f"{name} already loaded")
module = importlib.import_module(name, package)
try:
setup = getattr(module, "setup", None)
if not setup:
raise ExtensionLoadException(
f"{name} lacks an entry point. Ensure you have a function called `setup` defined in that file"
) from None
setup(self, **load_kwargs)
except ExtensionLoadException:
raise
except Exception as e:
del sys.modules[name]
raise ExtensionLoadException(f"Unexpected Error loading {name}") from e
else:
logger.debug(f"Loaded Extension: {name}")
self.__modules[name] = module
if self.sync_ext and self._ready.is_set():
try:
asyncio.get_running_loop()
except RuntimeError:
return
asyncio.create_task(self.synchronise_interactions())
method
unload_extension(self, name, package, **unload_kwargs)
¶
Unload an extension with given arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
The name of the extension. |
required | |
package |
The package the extension is in |
None |
|
unload_kwargs |
The auto-filled mapping of the unload keyword arguments |
{} |
Source code in naff/client/client.py
def unload_extension(self, name, package=None, **unload_kwargs) -> None:
"""
Unload an extension with given arguments.
Args:
name: The name of the extension.
package: The package the extension is in
unload_kwargs: The auto-filled mapping of the unload keyword arguments
"""
name = importlib.util.resolve_name(name, package)
module = self.__modules.get(name)
if module is None:
raise ExtensionNotFound(f"No extension called {name} is loaded")
try:
teardown = getattr(module, "teardown")
teardown(**unload_kwargs)
except AttributeError:
pass
for ext in self.get_extensions(name):
ext.drop(**unload_kwargs)
del sys.modules[name]
del self.__modules[name]
if self.sync_ext and self._ready.is_set():
if self.sync_ext and self._ready.is_set():
try:
asyncio.get_running_loop()
except RuntimeError:
return
asyncio.create_task(self.synchronise_interactions())
method
reload_extension(self, name, package, *, load_kwargs, unload_kwargs)
¶
Helper method to reload an extension. Simply unloads, then loads the extension with given arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
The name of the extension. |
required | |
package |
The package the extension is in |
None |
|
load_kwargs |
Mapping[str, Any] |
The manually-filled mapping of the load keyword arguments |
None |
unload_kwargs |
Mapping[str, Any] |
The manually-filled mapping of the unload keyword arguments |
None |
Source code in naff/client/client.py
def reload_extension(
self, name, package=None, *, load_kwargs: Mapping[str, Any] = None, unload_kwargs: Mapping[str, Any] = None
) -> None:
"""
Helper method to reload an extension. Simply unloads, then loads the extension with given arguments.
Args:
name: The name of the extension.
package: The package the extension is in
load_kwargs: The manually-filled mapping of the load keyword arguments
unload_kwargs: The manually-filled mapping of the unload keyword arguments
"""
name = importlib.util.resolve_name(name, package)
module = self.__modules.get(name)
if module is None:
logger.warning("Attempted to reload extension thats not loaded. Loading extension instead")
return self.load_extension(name, package)
if not load_kwargs:
load_kwargs = {}
if not unload_kwargs:
unload_kwargs = {}
self.unload_extension(name, package, **unload_kwargs)
self.load_extension(name, package, **load_kwargs)
# todo: maybe add an ability to revert to the previous version if unable to load the new one
async
method
fetch_guild(self, guild_id)
¶
Fetch a guild.
Note
This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guild_id |
Snowflake_Type |
The ID of the guild to get |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.guild.Guild] |
Guild Object if found, otherwise None |
Source code in naff/client/client.py
async def fetch_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
"""
Fetch a guild.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
guild_id: The ID of the guild to get
Returns:
Guild Object if found, otherwise None
"""
try:
return await self.cache.fetch_guild(guild_id)
except NotFound:
return None
method
get_guild(self, guild_id)
¶
Get a guild.
Note
This method is an alias for the cache which will return a cached object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guild_id |
Snowflake_Type |
The ID of the guild to get |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.guild.Guild] |
Guild Object if found, otherwise None |
Source code in naff/client/client.py
def get_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
"""
Get a guild.
Note:
This method is an alias for the cache which will return a cached object.
Args:
guild_id: The ID of the guild to get
Returns:
Guild Object if found, otherwise None
"""
return self.cache.get_guild(guild_id)
async
method
create_guild_from_template(self, template_code, name, icon)
¶
Creates a new guild based on a template.
Note
This endpoint can only be used by bots in less than 10 guilds.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template_code |
Union[GuildTemplate, str] |
The code of the template to use. |
required |
name |
str |
The name of the guild (2-100 characters) |
required |
icon |
Union[naff.models.discord.file.File, io.IOBase, BinaryIO, pathlib.Path, str, naff.client.const.Missing] |
Location or File of icon to set |
Missing |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.guild.Guild] |
The newly created guild object |
Source code in naff/client/client.py
async def create_guild_from_template(
self,
template_code: Union["GuildTemplate", str],
name: str,
icon: Absent[UPLOADABLE_TYPE] = MISSING,
) -> Optional[Guild]:
"""
Creates a new guild based on a template.
note:
This endpoint can only be used by bots in less than 10 guilds.
Args:
template_code: The code of the template to use.
name: The name of the guild (2-100 characters)
icon: Location or File of icon to set
Returns:
The newly created guild object
"""
if isinstance(template_code, GuildTemplate):
template_code = template_code.code
if icon:
icon = to_image_data(icon)
guild_data = await self.http.create_guild_from_guild_template(template_code, name, icon)
return Guild.from_dict(guild_data, self)
async
method
fetch_channel(self, channel_id)
¶
Fetch a channel.
Note
This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
channel_id |
Snowflake_Type |
The ID of the channel to get |
required |
Returns:
Type | Description |
---|---|
Optional[TYPE_ALL_CHANNEL] |
Channel Object if found, otherwise None |
Source code in naff/client/client.py
async def fetch_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
"""
Fetch a channel.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
channel_id: The ID of the channel to get
Returns:
Channel Object if found, otherwise None
"""
try:
return await self.cache.fetch_channel(channel_id)
except NotFound:
return None
method
get_channel(self, channel_id)
¶
Get a channel.
Note
This method is an alias for the cache which will return a cached object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
channel_id |
Snowflake_Type |
The ID of the channel to get |
required |
Returns:
Type | Description |
---|---|
Optional[TYPE_ALL_CHANNEL] |
Channel Object if found, otherwise None |
Source code in naff/client/client.py
def get_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
"""
Get a channel.
Note:
This method is an alias for the cache which will return a cached object.
Args:
channel_id: The ID of the channel to get
Returns:
Channel Object if found, otherwise None
"""
return self.cache.get_channel(channel_id)
async
method
fetch_user(self, user_id)
¶
Fetch a user.
Note
This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
Snowflake_Type |
The ID of the user to get |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.user.User] |
User Object if found, otherwise None |
Source code in naff/client/client.py
async def fetch_user(self, user_id: "Snowflake_Type") -> Optional[User]:
"""
Fetch a user.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
user_id: The ID of the user to get
Returns:
User Object if found, otherwise None
"""
try:
return await self.cache.fetch_user(user_id)
except NotFound:
return None
method
get_user(self, user_id)
¶
Get a user.
Note
This method is an alias for the cache which will return a cached object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
Snowflake_Type |
The ID of the user to get |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.user.User] |
User Object if found, otherwise None |
Source code in naff/client/client.py
def get_user(self, user_id: "Snowflake_Type") -> Optional[User]:
"""
Get a user.
Note:
This method is an alias for the cache which will return a cached object.
Args:
user_id: The ID of the user to get
Returns:
User Object if found, otherwise None
"""
return self.cache.get_user(user_id)
async
method
fetch_member(self, user_id, guild_id)
¶
Fetch a member from a guild.
Note
This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
Snowflake_Type |
The ID of the member |
required |
guild_id |
Snowflake_Type |
The ID of the guild to get the member from |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.user.Member] |
Member object if found, otherwise None |
Source code in naff/client/client.py
async def fetch_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
"""
Fetch a member from a guild.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
user_id: The ID of the member
guild_id: The ID of the guild to get the member from
Returns:
Member object if found, otherwise None
"""
try:
return await self.cache.fetch_member(guild_id, user_id)
except NotFound:
return None
method
get_member(self, user_id, guild_id)
¶
Get a member from a guild.
Note
This method is an alias for the cache which will return a cached object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
Snowflake_Type |
The ID of the member |
required |
guild_id |
Snowflake_Type |
The ID of the guild to get the member from |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.user.Member] |
Member object if found, otherwise None |
Source code in naff/client/client.py
def get_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
"""
Get a member from a guild.
Note:
This method is an alias for the cache which will return a cached object.
Args:
user_id: The ID of the member
guild_id: The ID of the guild to get the member from
Returns:
Member object if found, otherwise None
"""
return self.cache.get_member(guild_id, user_id)
async
method
fetch_scheduled_event(self, guild_id, scheduled_event_id, with_user_count)
¶
Fetch a scheduled event by id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_id |
The id of the scheduled event. |
required |
Returns:
Type | Description |
---|---|
Optional[ScheduledEvent] |
The scheduled event if found, otherwise None |
Source code in naff/client/client.py
async def fetch_scheduled_event(
self, guild_id: "Snowflake_Type", scheduled_event_id: "Snowflake_Type", with_user_count: bool = False
) -> Optional["ScheduledEvent"]:
"""
Fetch a scheduled event by id.
Args:
event_id: The id of the scheduled event.
Returns:
The scheduled event if found, otherwise None
"""
try:
scheduled_event_data = await self.http.get_scheduled_event(guild_id, scheduled_event_id, with_user_count)
return ScheduledEvent.from_dict(scheduled_event_data, self)
except NotFound:
return None
async
method
fetch_custom_emoji(self, emoji_id, guild_id)
¶
Fetch a custom emoji by id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
emoji_id |
Snowflake_Type |
The id of the custom emoji. |
required |
guild_id |
Snowflake_Type |
The id of the guild the emoji belongs to. |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.emoji.CustomEmoji] |
The custom emoji if found, otherwise None. |
Source code in naff/client/client.py
async def fetch_custom_emoji(self, emoji_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[CustomEmoji]:
"""
Fetch a custom emoji by id.
Args:
emoji_id: The id of the custom emoji.
guild_id: The id of the guild the emoji belongs to.
Returns:
The custom emoji if found, otherwise None.
"""
try:
return await self.cache.fetch_emoji(guild_id, emoji_id)
except NotFound:
return None
method
get_custom_emoji(self, emoji_id, guild_id)
¶
Get a custom emoji by id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
emoji_id |
Snowflake_Type |
The id of the custom emoji. |
required |
guild_id |
Optional[Snowflake_Type] |
The id of the guild the emoji belongs to. |
None |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.emoji.CustomEmoji] |
The custom emoji if found, otherwise None. |
Source code in naff/client/client.py
def get_custom_emoji(
self, emoji_id: "Snowflake_Type", guild_id: Optional["Snowflake_Type"] = None
) -> Optional[CustomEmoji]:
"""
Get a custom emoji by id.
Args:
emoji_id: The id of the custom emoji.
guild_id: The id of the guild the emoji belongs to.
Returns:
The custom emoji if found, otherwise None.
"""
emoji = self.cache.get_emoji(emoji_id)
if emoji and (not guild_id or emoji._guild_id == to_snowflake(guild_id)):
return emoji
return None
async
method
fetch_sticker(self, sticker_id)
¶
Fetch a sticker by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sticker_id |
Snowflake_Type |
The ID of the sticker. |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.sticker.Sticker] |
A sticker object if found, otherwise None |
Source code in naff/client/client.py
async def fetch_sticker(self, sticker_id: "Snowflake_Type") -> Optional[Sticker]:
"""
Fetch a sticker by ID.
Args:
sticker_id: The ID of the sticker.
Returns:
A sticker object if found, otherwise None
"""
try:
sticker_data = await self.http.get_sticker(sticker_id)
return Sticker.from_dict(sticker_data, self)
except NotFound:
return None
async
method
fetch_nitro_packs(self)
¶
List the sticker packs available to Nitro subscribers.
Returns:
Type | Description |
---|---|
Optional[List[StickerPack]] |
A list of StickerPack objects if found, otherwise returns None |
Source code in naff/client/client.py
async def fetch_nitro_packs(self) -> Optional[List["StickerPack"]]:
"""
List the sticker packs available to Nitro subscribers.
Returns:
A list of StickerPack objects if found, otherwise returns None
"""
try:
packs_data = await self.http.list_nitro_sticker_packs()
return [StickerPack.from_dict(data, self) for data in packs_data]
except NotFound:
return None
async
method
fetch_voice_regions(self)
¶
List the voice regions available on Discord.
Returns:
Type | Description |
---|---|
List[VoiceRegion] |
A list of voice regions. |
Source code in naff/client/client.py
async def fetch_voice_regions(self) -> List["VoiceRegion"]:
"""
List the voice regions available on Discord.
Returns:
A list of voice regions.
"""
regions_data = await self.http.list_voice_regions()
regions = VoiceRegion.from_list(regions_data)
return regions
async
method
connect_to_vc(self, guild_id, channel_id, muted, deafened)
¶
Connect the bot to a voice channel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guild_id |
Snowflake_Type |
id of the guild the voice channel is in. |
required |
channel_id |
Snowflake_Type |
id of the voice channel client wants to join. |
required |
muted |
bool |
Whether the bot should be muted when connected. |
False |
deafened |
bool |
Whether the bot should be deafened when connected. |
False |
Returns:
Type | Description |
---|---|
ActiveVoiceState |
The new active voice state on successfully connection. |
Source code in naff/client/client.py
async def connect_to_vc(
self, guild_id: "Snowflake_Type", channel_id: "Snowflake_Type", muted: bool = False, deafened: bool = False
) -> ActiveVoiceState:
"""
Connect the bot to a voice channel.
Args:
guild_id: id of the guild the voice channel is in.
channel_id: id of the voice channel client wants to join.
muted: Whether the bot should be muted when connected.
deafened: Whether the bot should be deafened when connected.
Returns:
The new active voice state on successfully connection.
"""
return await self._connection_state.voice_connect(guild_id, channel_id, muted, deafened)
method
get_bot_voice_state(self, guild_id)
¶
Get the bot's voice state for a guild.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guild_id |
Snowflake_Type |
The target guild's id. |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.naff.active_voice_state.ActiveVoiceState] |
The bot's voice state for the guild if connected, otherwise None. |
Source code in naff/client/client.py
def get_bot_voice_state(self, guild_id: "Snowflake_Type") -> Optional[ActiveVoiceState]:
"""
Get the bot's voice state for a guild.
Args:
guild_id: The target guild's id.
Returns:
The bot's voice state for the guild if connected, otherwise None.
"""
return self._connection_state.get_voice_state(guild_id)
async
method
change_presence(self, status, activity)
¶
Change the bots presence.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status |
Union[str, naff.models.discord.enums.Status] |
The status for the bot to be. i.e. online, afk, etc. |
<Status.ONLINE: 'online'> |
activity |
Union[naff.models.discord.activity.Activity, str] |
The activity for the bot to be displayed as doing. |
None |
Note::
Bots may only be playing
streaming
listening
watching
or competing
, other activity types are likely to fail.
Source code in naff/client/client.py
async def change_presence(
self, status: Optional[Union[str, Status]] = Status.ONLINE, activity: Optional[Union[Activity, str]] = None
) -> None:
"""
Change the bots presence.
Args:
status: The status for the bot to be. i.e. online, afk, etc.
activity: The activity for the bot to be displayed as doing.
Note::
Bots may only be `playing` `streaming` `listening` `watching` or `competing`, other activity types are likely to fail.
"""
await self._connection_state.change_presence(status, activity)